What is the fastest way to bulk load data into HBase programmatically?
I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using HFileOutputFormat
as the OutputFormatClass of the MR.
Below is the basis of my code that I have to generate the job
and the Mapper map
function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.
Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase
private Job createCubeJob(...) {
//Build and Configure Job
Job job = new Job(conf);
job.setJobName(jobName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
job.setJarByClass(CubeBuilderDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
TextInputFormat.setInputPaths(job, hiveOutputDir);
HFileOutputFormat.setOutputPath(job, cubeOutputPath);
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(job, hTable);
return job;
}
This is my map function from the HiveToHBaseMapper
class (slightly edited ).
public void map(WritableComparable key, Writable val, Context context)
throws IOException, InterruptedException {
try{
Configuration config = context.getConfiguration();
String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
String column = strs[COLUMN_INDEX];
String Value = strs[VALUE_INDEX];
String sKey = generateKey(strs, config);
byte[] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0)
? Bytes.toBytes(Double.MIN_VALUE)
: Bytes.toBytes(value));
ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey, put);
context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
}
catch(Exception e){
context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);
}
}
I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the HFileOutputFormat
. The rest is just an example of how I used it. :)
I hope it gets you onto a solid path to a good solution. :
Related videos on Youtube
Cihan Keser
Updated on July 09, 2022Comments
-
Cihan Keser almost 2 years
I have a Plain text file with possibly millions of lines which needs custom parsing and I want to load it into an HBase table as fast as possible (using Hadoop or HBase Java client).
My current solution is based on a MapReduce job without the Reduce part. I use
FileInputFormat
to read the text file so that each line is passed to themap
method of myMapper
class. At this point the line is parsed to form aPut
object which is written to thecontext
. Then,TableOutputFormat
takes thePut
object and inserts it to table.This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo distributed mode on a single server.
One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?
Here is the code for my current solution:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { protected void map(LongWritable key, Text value, Context context) throws IOException { Map<String, String> parsedLine = parseLine(value.toString()); Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); for (String currentKey : parsedLine.keySet()) { row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); } try { context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(String[] args) throws Exception { if (args.length != 2) { return -1; } conf.set("hbase.mapred.outputtable", args[1]); // I got these conf parameters from a presentation about Bulk Load conf.set("hbase.hstore.blockingStoreFiles", "25"); conf.set("hbase.hregion.memstore.block.multiplier", "8"); conf.set("hbase.regionserver.handler.count", "30"); conf.set("hbase.regions.percheckin", "30"); conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); Job job = new Job(conf); job.setJarByClass(BulkLoadMapReduce.class); job.setJobName(NAME); TextInputFormat.setInputPaths(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(CustomMap.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { Long startTime = Calendar.getInstance().getTimeInMillis(); System.out.println("Start time : " + startTime); int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); Long endTime = Calendar.getInstance().getTimeInMillis(); System.out.println("End time : " + endTime); System.out.println("Duration milliseconds: " + (endTime-startTime)); System.exit(errCode); }
-
Michael Dautermann over 12 yearsI'm assuming you wanted your title to be "bulk load" and not "bluk load"... but let me know if my correction was wrong. :-)
-
Chris Shain over 12 yearsHave you read this? hbase.apache.org/docs/r0.89.20100621/bulk-loads.html
-
Chris Shain over 12 yearsAlso, have you pre-split your regions? If not, you basically have a single-threaded writer, which would explain it. You basically get one writer per region.
-
Cihan Keser over 12 yearsThanks for the typo correction Michael, i actually proof read my question twice... well, so much for stackoverflowing at 2 a.m..
-
Cihan Keser over 12 years@Chris: Yes, I've read that link earlier. About pre-splitting my regions: I didn't really understand that concept :(. Care to explain or how to do it?
-
Chris Shain over 12 yearsEach row is in exactly one region, determined by it's row key. You can see which key ranges each region has in the web UI. If you don't pre-split regions, the data that you insert will all go into the first region until you reach the threshold for splitting that region, at which point it will be split evenly at whatever key happens to be the median of the keys in the region. If you don't pre-split your regions, all of your puts are going to be going into one region (on one node), until the split happens, and then only to two regions, etc. By pre-splitting you parallize writes from the start.
-
Chris Shain over 12 yearsPre-splitting is more of an art than a science. You need to find good key ranges that make sense for your data. To perform the pre-split, you can use the RegionSplitter class described here: hbase.apache.org/apidocs/org/apache/hadoop/hbase/util/…. You launch this from the command line as an argument to the hbase shell command, e.g. "hbase org.apache.hadoop.hbase.util.RegionSplitter"
-
-
Cihan Keser over 12 yearsTried that but the result didn't change.
-
Praveen Sripati over 12 yearsWhere did you specify the parameter? It should be specified in the mapred-site.xml on all the nodes before the Hadoop daemons start. Check this documentation. How did you verify? Can be verified from the JobTracker Web Console.
-
Cihan Keser over 12 yearsI tried using
HfileOutputFormat
in my code but, i keep getting below exception, any ideas?java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:82) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156) ...
-
QuinnG over 12 years@kramer More than, attempting to
write
a a different type than it is expecting (hence the cast error) not really. Would need to see the code to take a shot at that. -
Alexey Tigarev about 11 yearsIs HFileOutputFormat faster then TableOutputFormat? Given equal situation with region splitting.
-
Jon Cardoso almost 11 yearsWhich jar has this
HfileOutputFormat
class? I can't find it