spark read wholeTextFiles with non UTF-8 encoding

12,841

Solution 1

It's Simple.

Here is the source code,

import java.nio.charset.Charset

import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object TextFile {
  val DEFAULT_CHARSET = Charset.forName("UTF-8")

  def withCharset(context: SparkContext, location: String, charset: String): RDD[String] = {
    if (Charset.forName(charset) == DEFAULT_CHARSET) {
      context.textFile(location)
    } else {
      // can't pass a Charset object here cause its not serializable
      // TODO: maybe use mapPartitions instead?
      context.hadoopFile[LongWritable, Text, TextInputFormat](location).map(
        pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)
      )
    }
  }
}

From here it's copied.

https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/TextFile.scala

To Use it.

https://github.com/databricks/spark-csv/blob/master/src/test/scala/com/databricks/spark/csv/util/TextFileSuite.scala

Edit:

If you need wholetext file,

Here is the actual source of the implementation.

def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
    // comma separated files as input. (see SPARK-7155)
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
  }

Try changing :

.map(record => (record._1.toString, record._2.toString))

to(probably):

.map(record => (record._1.toString, new String(record._2.getBytes, 0, record._2.getLength, "myCustomCharset")))

Solution 2

You can read the files using SparkContext.binaryFiles() instead and build the String for the contents specifying the charset you need. E.g:

val df = spark.sparkContext.binaryFiles(path, 12)
  .mapValues(content => new String(content.toArray(), StandardCharsets.ISO_8859_1))
  .toDF
Share:
12,841

Related videos on Youtube

Georg Heiler
Author by

Georg Heiler

I am a Ph.D. candidate at the Vienna University of Technology and Complexity Science Hub Vienna as well as a data scientist in the industry.

Updated on June 04, 2022

Comments

  • Georg Heiler
    Georg Heiler almost 2 years

    I want to read whole text files in non UTF-8 encoding via

    val df = spark.sparkContext.wholeTextFiles(path, 12).toDF
    

    into spark. How can I change the encoding? I would want to read ISO-8859 encoded text, but it is not CSV, it is something similar to xml:SGML.

    edit

    maybe a custom Hadoop file input format should be used?

  • Georg Heiler
    Georg Heiler about 7 years
    That does seem to be a good start. But How can I achieve similar behavior of wholeTextFiles where the file path is contained as a key and a single row will be created per file unlike the spark-csv variant above, where the lines in a file will be tokenized into rows in the RDD.
  • RBanerjee
    RBanerjee about 7 years
  • Georg Heiler
    Georg Heiler about 7 years
    I think this is nearly there. But .map(record => (record._1.toString, record._2.toString, "iso-8859-1")).setName(path) will fail to compile due to found : org.apache.spark.rdd.RDD[(String, String, String)] [error] required: org.apache.spark.rdd.RDD[(String, String)]
  • Georg Heiler
    Georg Heiler about 7 years
    And I am not sure if WholeTextFileInputFormat wouldn't need to be redefined using the desired encoding?
  • Georg Heiler
    Georg Heiler about 7 years
    Would I be required to replace RecordReader[Text, Text] with a custom Textimplementation which is not using UTF-8?
  • abhiyenta
    abhiyenta about 6 years
    See now, this is simple!