Python MapReduce Hadoop Streaming Job that requires multiple input files?

14,884

Solution 1

Please take a look at this example as it relates pretty much directly to what you're looking for.

Solution 2

Is File A really that large? I would put it in the DistributedCache and read it from there. To put it in the distributed cache, use this option in the Hadoop streaming call:

-cacheFile 'hdfs://namenode:port/the/hdfs/path/to/FileA#FileA'

(I suppose the following should work too, but I have not tried it:)

-cacheFile '/the/hdfs/path/to/FileA#FileA'

Note that the #fileA is the name you are using to make the file available to your mappers.

Then, in your mapper, you'll read FileB from sys.stdin (asuming you called Hadoop Streaming using -input '/user/foo/FileB') AND, to read FileA, you should do something like this:

f = open('FileA', 'r')
...
f.readline()

Now, I suppose you have already thought of this, but to me it would make sense to have a mapper like this:

  1. Open FileA
  2. Read FileA, line by line (in a loop) and load it into a map so that you can easily lookup a key and find its value (yes, no).
  3. Have your main loop reading from stdin. Within the loop, for each line (in FileB), check your map (see step 2) to find out whether you have a fruit or not... etc.

Solution 3

One approach is to do this as two jobs.

  1. Filter FileB so that only rows containing fruits are retained
    • Map1: A composite key of "Food Item" and which file the data came from. Partition on "Food Item", with a secondary sort whether the row contains "Is_A_Fruit" information (to ensure that this is read first by the reducer for each food item).
    • Reduce1: With the secondary sort, the first row in the sorted data will indicate whether this Food Item is a fruit (in which case the reducer will output it) or not (in which case it won't).
  2. Use the vendor as key to count the number of fruits per vendor.
    • The MapReduce output from the first job here now has the same structure as FileB, but all rows are Fruits, so this is more like wordcount, with Vendor Name as the key, and then count the number of rows.
    • If you want unique fruits, you may need to use a secondary sort again to eliminate the need to load all fruits associated to each vendor into memory.

That said: the solution @cabad suggests is best if the file is small enough.

If not, the approach with secondary sorts is best. Take a look at this tutorial suggested in @Simplefish's answer here for how to do secondary sorts within a partition (which are the keywords which will point you in the right direction to do what you want: make guarantees about the order of data associated to a given key that is passed to a reducer).

One final note: your problem isn't "How to read from multiple files" as any solution you devise can't rely on knowing which file the input comes from (you'll need to rely on the structure of the data, although that isn't an issue in this example).

Solution 4

You will only provide the directory which contains the files, to hadoop. Hadoop framework will read them. You don't.

Hadoop will apply the map class you code to all files contents.

Then, hadoop will apply reduce class coded by you, to all the output from map class.

Share:
14,884
ComputerFellow
Author by

ComputerFellow

Updated on June 06, 2022

Comments

  • ComputerFellow
    ComputerFellow almost 2 years

    I have two files in my cluster File A and File B with the following data -

    File A

    #Format: 
    #Food Item | Is_A_Fruit (BOOL)
    
    Orange | Yes
    Pineapple | Yes
    Cucumber | No
    Carrot | No
    Mango | Yes
    

    File B

    #Format:
    #Food Item | Vendor Name
    
    Orange | Vendor A
    Pineapple | Vendor B
    Cucumber | Vendor B
    Carrot | Vendor B
    Mango | Vendor A
    

    Basically I want to find out How many fruits are each vendor selling?

    Expected output:

    Vendor A | 2
    Vendor B | 1
    

    I need to do this using hadoop streaming python map reduce.

    I have read how to do a basic word count, I read from sys.stdin and emit k,v pairs for the reducer to then reduce.

    How do I approach this problem?

    My main concern is how to read from multiple files and then compare them in Hadoop Streaming.

    I can do this in normal python (i.e without MapReduce & Hadoop, it's straightforward.) but it's infeasible for the sheer size of data that I have with me.

  • cohoz
    cohoz over 10 years
    +1. Initially I was a little skeptical that the example actually worked as suggested because the author there buries the important part, e.g., separating the partitioning and sorting, which is done via "-jobconf stream.num.map.output.key.fields=4 -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1".