How does Hadoop process records split across block boundaries?

35,425

Solution 1

Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits, so a look at FileInputFormat gives the following info:

  • For each input file, get the file length, the block size and calculate the split size as max(minSize, min(maxSize, blockSize)) where maxSize corresponds to mapred.max.split.size and minSize is mapred.min.split.size.
  • Divide the file into different FileSplits based on the split size calculated above. What's important here is that each FileSplit is initialized with a start parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

After that, if you look at the LineRecordReader which is defined by the TextInputFormat, that's where the lines are handled:

  • When you initialize your LineRecordReader it tries to instantiate a LineReader which is an abstraction to be able to read lines over FSDataInputStream. There are 2 cases:
  • If there is a CompressionCodec defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.
  • If there is no codec however, that's where things are interesting: if the start of your InputSplit is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.

So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:

  • Split 1 containing the path and the hosts to this block. Initialized at start 200-200=0Mb, length 64Mb.
  • Split 2 initialized at start 200-200+64=64Mb, length 64Mb.
  • Split 3 initialized at start 200-200+128=128Mb, length 64Mb.
  • Split 4 initialized at start 200-200+192=192Mb, length 8Mb.
  • Mapper A will process split 1, start is 0 so don't skip first line, and read a full line which goes beyond the 64Mb limit so needs remote read.
  • Mapper B will process split 2, start is != 0 so skip the first line after 64Mb-1byte, which corresponds to the end of line 1 at 100Mb which is still in split 2, we have 28Mb of the line in split 2, so remote read the remaining 72Mb.
  • Mapper C will process split 3, start is != 0 so skip the first line after 128Mb-1byte, which corresponds to the end of line 2 at 200Mb, which is end of file so don't do anything.
  • Mapper D is the same as mapper C except it looks for a newline after 192Mb-1byte.

Solution 2

Map Reduce algorithm does not work on physical blocks of the file. It works on logical input splits. Input split depends on where the record was written. A record may span two Mappers.

The way HDFS has been set up, it breaks down very large files into large blocks (for example, measuring 128MB), and stores three copies of these blocks on different nodes in the cluster.

HDFS has no awareness of the content of these files. A record may have been started in Block-a but end of that record may be present in Block-b.

To solve this problem, Hadoop uses a logical representation of the data stored in file blocks, known as input splits. When a MapReduce job client calculates the input splits, it figures out where the first whole record in a block begins and where the last record in the block ends.

The key point :

In cases where the last record in a block is incomplete, the input split includes location information for the next block and the byte offset of the data needed to complete the record.

Have a look at below diagram.

enter image description here

Have a look at this article and related SE question : About Hadoop/HDFS file splitting

More details can be read from documentation

The Map-Reduce framework relies on the InputFormat of the job to:

  1. Validate the input-specification of the job.
  2. Split-up the input file(s) into logical InputSplits, each of which is then assigned to an individual Mapper.
  3. Each InputSplit is then assigned to an individual Mapper for processing. Split could be tuple. InputSplit[] getSplits(JobConf job,int numSplits) is the API to take care of these things.

FileInputFormat, which extends InputFormat implemented getSplits() method. Have a look at internals of this method at grepcode

Solution 3

I see it as following: InputFormat is responsible to split data into logical splits taking into account the nature of the data.
Nothing prevents it to do so, although it can add significant latency to the job - all the logic and reading around the desired split size boundaries will happen in the jobtracker.
Simplest record aware input format is TextInputFormat. It is working as following (as far as I understood from code) - input format create splits by size, regardless of the lines, but LineRecordReader always :
a) Skip first line in the split (or part of it), if it is not the first split
b) Read one line after the boundary of the split in the end (if data it is available, so it is not the last split).

Solution 4

From what I've understood, when the FileSplit is initialized for the first block, the default constructor is called. Therefore the values for start and length are zero initially. By the end of processing of the fist block, the if the last line is incomplete, then the value of length will be greater than the length of the split and it'll read the first line of next block as well. Due to this the value of start for the first block will be greater than zero and under this condition, the LineRecordReader will skip the fist line of the second block. (See source)

In case the last line of the first block is complete, then the value of length will be equal to the length of the first block and the value of the start for the second block will be zero. In that case the LineRecordReader will not skip the first line and read the second block form the beginning.

Makes sense?

Solution 5

From hadoop source code of LineRecordReader.java the constructor: I find some comments :

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

from this I believe hadoop will read one extra line for each split(at the end of current split, read next line in next split), and if not first split, the first line will be throw away. so that no line record will be lost and incomplete

Share:
35,425
Praveen Sripati
Author by

Praveen Sripati

Very passionate about the intersection of Big Data and Cloud technologies. I am a Cloudera Certified Developer for Apache Hadoop, Hortonworks Certified Apache Hadoop Java Developer, AWS Certified Solutions Architect - Associate and AWS Certified Developer - Associate. If interested Consulting/Projects/Trainings around Cloud and Big Data, please contact at [email protected]. Currently I am conducting a training on AWS Development, more details here. I started blogging for fun and started liking it. So, I regularly blog at thecloudavenue.com around Big Data, K8S and Cloud related technologies. I also Tweet here.

Updated on November 07, 2020

Comments

  • Praveen Sripati
    Praveen Sripati over 3 years

    According to the Hadoop - The Definitive Guide

    The logical records that FileInputFormats define do not usually fit neatly into HDFS blocks. For example, a TextInputFormat’s logical records are lines, which will cross HDFS boundaries more often than not. This has no bearing on the functioning of your program—lines are not missed or broken, for example—but it’s worth knowing about, as it does mean that data-local maps (that is, maps that are running on the same host as their input data) will perform some remote reads. The slight overhead this causes is not normally significant.

    Suppose a record line is split across two blocks (b1 and b2). The mapper processing the first block (b1) will notice that the last line doesn't have a EOL separator and fetches the remaining of the line from the next block of data (b2).

    How does the mapper processing the second block (b2) determine that the first record is incomplete and should process starting from the second record in the block (b2)?

  • Praveen Sripati
    Praveen Sripati over 11 years
    Skip first line in the split (or part of it), if it is not the first split - if the first record in non-first block is complete, then not sure how this logic will work.
  • David Gruzman
    David Gruzman over 11 years
    As far as I see the code - each split read what it has + next line. So if line break is not on the block boundary - it is ok. How exactly handled case when the line break is exactly on the block bound - have to be understood - i will read code a bit more
  • Praveen Sripati
    Praveen Sripati over 11 years
    In this scenario, the mappers have to communicate with each other and process the blocks in sequence when the last line in a particular block is not complete. Not sure if this is the way it works.
  • Charles Menguy
    Charles Menguy over 11 years
    Also @PraveenSripati it's worth mentioning that the edge cases where a boundary would be at \r in a \r\n return are handled in the LineReader.readLine function, I don't think it's relevant to your question but can add more details if needed.
  • Praveen Sripati
    Praveen Sripati over 11 years
    Lets assume there are two lines with exact 64MB in the input and so the InputSplits happen exactly at the line boundaries. So, will the mapper always ignore the line in the second block because start != 0.
  • Charles Menguy
    Charles Menguy over 11 years
    @PraveenSripati In that case, the second mapper will see start != 0, so backtrack 1 character, which brings you back just before the \n of the first line and then skip up to the following \n. So it will skip the first line but process the second line as expected.
  • Kobe-Wan Kenobi
    Kobe-Wan Kenobi over 9 years
    @CharlesMenguy is it possible that the first line of the file gets skipped somehow? Concretely, I have first line with key=1, and value a, then there are two more lines with the same key somewhere in the file, key=1, val=b and key=1, val=c. The thing is, my reducer gets {1, [b,c]} and {1, [a]}, instead of {1, [a,b,c]}. This doesn't happen if I add new line to the beginning of my file. What could be the reason, Sir?
  • CᴴᴀZ
    CᴴᴀZ over 7 years
    @CharlesMenguy What if the file on HDFS is a binary file (as opposed to text file, in which \r\n, \n represents record truncation)?