Possible to handle multi character delimiter in spark

12,631
val df = spark.read.format("csv").load("inputpath")
df.rdd.map(i => i.mkString.split("\\[\\~\\]")).toDF().show(false)

try below

for your another requirement

val df1 = df.rdd.map(i => i.mkString.split("\\[\\~\\]").mkString(",")).toDF()
val iterationColumnLength = df1.rdd.first.mkString(",").split(",").length
df1.withColumn("value",split(col("value"),",")).select((0 until iterationColumnLength).map(i => col("value").getItem(i).as("col_" + i)): _*).show

enter image description here

Share:
12,631
test acc
Author by

test acc

Updated on June 07, 2022

Comments

  • test acc
    test acc almost 2 years

    I have [~] as my delimiter for some csv files I am reading.

    1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
    

    I have tried this

    val rddFile = sc.textFile("file.csv")
    val rddTransformed = rddFile.map(eachLine=>eachLine.split("[~]"))
    val df = rddTransformed.toDF()
    display(df)
    

    However this issue with this, is that it comes as a single value array with [ and ] in each field. So the array would be

    ["1[","]a[","]b[",...]
    

    I can't use

    val df = spark.read.option("sep", "[~]").csv("file.csv")
    

    Because multi-character seperator is not supported. What other approach can I take?

    1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
    2[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
    3[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
    

    Edit - this is not a duplicate, the duplicated thread is about multi delimiters, this is multi-character single delimiter

  • test acc
    test acc over 5 years
    The issue with this is the same to what I used rddFile.map(eachLine=>eachLine.split("\\[\\~\\]")), (when correctly escaping the split characters) split returns an array and gets pushed into a single column of array type, instead of splitting it into separate columns.
  • Chandan Ray
    Chandan Ray over 5 years
    updated the answer to convert one column to multiple
  • test acc
    test acc over 5 years
    df1.withColumn("value",explode(col("value"))).withColumn("in‌​dex", lit(1)).groupBy("index").pivot("value").sum("index").filter(‌​col("index") =!= 1).drop("index", "").show This portion returned a blank dataframe. df1 had a large amount of records, but the following transformation seemed to only produce an empty dataframe.
  • Chandan Ray
    Chandan Ray over 5 years
    That’s correct. But you have no rows correct. Could you please send the exact test data. The code only convert your array of one column to multiple column in basis of number of values in array. Please send your file.csv with few sample records
  • test acc
    test acc over 5 years
    I updated OP with test data that's not working. I have multiple rows. However, even if the test data was only 1 row, it should still work.
  • Chandan Ray
    Chandan Ray over 5 years
    Updated as per your requirement. Please check
  • test acc
    test acc over 5 years
    Hey, I have one more quick question, I do not think this would be possible, but I have a predefined schema object. Generally I would do spark.read.option("sep", ",").option("badRecordsPath", badRecordLoc).schema(schema).csv(inputLoc). By passing a schema with bad record path, it would put rows with the incorrect number of columns in the bad record location. Would it be possible to add this to your implementation? I'm assuming not possible.
  • Chandan Ray
    Chandan Ray over 5 years
    @testacc I would suggest to filter out those rows on basis of the condition. You may not use this option as above
  • thebluephantom
    thebluephantom over 5 years
    I noted it did not entirely work.