hadoop map reduce job with HDFS input and HBASE output
Solution 1
Here is the code which will solve your problem
Driver
HBaseConfiguration conf = HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
job.setJarByClass(yourclass.class);
job.setMapperClass(yourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Intwritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
TableMapReduceUtil.initTableReducerJob(TABLE,
yourReducer.class, job);
job.setReducerClass(yourReducer.class);
job.waitForCompletion(true);
Mapper&Reducer
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
}
class yourReducer
extends
TableReducer<Text, IntWritable,
ImmutableBytesWritable>
{
//@override reduce()
}
Solution 2
Not sure why the HDFS version works: normaly you have to set the input format for the job, and FileInputFormat is an abstract class. Perhaps you left some lines out? such as
job.setInputFormatClass(TextInputFormat.class);
Solution 3
The best and fastest way to BulkLoad data in HBase is use of HFileOutputFormat
and CompliteBulkLoad
utility.
You will find a sample code here:
Hope this will be useful :)
jmventar
Software engineer working on a start-up. Learning every day!
Updated on June 04, 2022Comments
-
jmventar almost 2 years
I'm new on hadoop. I have a MapReduce job which is supposed to get an input from Hdfs and write the output of the reducer to Hbase. I haven't found any good example.
Here's the code, the error runing this example is Type mismatch in map, expected ImmutableBytesWritable recieved IntWritable.
Mapper Class
public static class AddValueMapper extends Mapper < LongWritable, Text, ImmutableBytesWritable, IntWritable > { /* input <key, line number : value, full line> * output <key, log key : value >*/ public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { byte[] key; int value, pos = 0; String line = value.toString(); String p1 , p2 = null; pos = line.indexOf("="); //Key part p1 = line.substring(0, pos); p1 = p1.trim(); key = Bytes.toBytes(p1); //Value part p2 = line.substring(pos +1); p2 = p2.trim(); value = Integer.parseInt(p2); context.write(new ImmutableBytesWritable(key),new IntWritable(value)); } }
Reducer Class
public static class AddValuesReducer extends TableReducer< ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> { public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long total =0; // Loop values while(values.iterator().hasNext()){ total += values.iterator().next().get(); } // Put to HBase Put put = new Put(key.get()); put.add(Bytes.toBytes("data"), Bytes.toBytes("total"), Bytes.toBytes(total)); Bytes.toInt(key.get()), total)); context.write(key, put); } }
I had a similar job only with HDFS and works fine.
Edited 18-06-2013. The college project finished successfully two years ago. For job configuration (driver part) check correct answer.
-
jmventar about 13 yearsThank you for answering there were several errors on the driver part, now its solved.
-
jmventar almost 11 yearsThank you for your answers, the code post was correct and the project finished two years ago.
-
jmventar almost 11 yearsThank you for your answer, your job configuration is same that solved my problem. I will edit and format my question when I have a bit of time to clean up this mess.
-
GenericJon over 10 yearsWhile this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes.