Pig JVM java heap space error
13,958
Probably a problem with the BZip codec - the API does note that it's rather memory hungry:
-
http://hadoop.apache.org/common/docs/r0.20.0/api/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.html
The compression requires large amounts of memory
-Xms2048m
did you set the options for the pig grunt shell, or for the map/reduce jobs?
set mapred.child.java.opts=-Xmx2048m
You can check by looking in the JobTracker, find the job that failed, open the job.xml and locate the value of mapred.child.java.opts
Author by
user1426777
Updated on June 04, 2022Comments
-
user1426777 almost 2 years
I m trying to run a pig script which is calling a User Defined Function written in java.I m trying to test this script with a very small file of 264Bytes. I end up getting java heap space errors and the job fails. I have tried running the job with the -Xms1024M option, it runs for the smaller files but fails with a larger file. And even then my cluster is powerful enough to not trip over such small files, I wonder how i can fix this memory leak. Can someone pls help,
import java.util.HashMap; import java.lang.annotation.Annotation; import java.lang.reflect.Array; import java.lang.reflect.Method; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.Set; import java.text.*; import org.apache.pig.EvalFunc; import org.apache.pig.data.*; import com.tictactec.ta.lib.CoreAnnotated; import com.tictactec.ta.lib.MAType; import com.tictactec.ta.lib.MInteger; import com.tictactec.ta.lib.RetCode; import com.tictactec.ta.lib.meta.annotation.InputParameterInfo; import com.tictactec.ta.lib.meta.annotation.InputParameterType; import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo; import com.tictactec.ta.lib.meta.annotation.OptInputParameterType; import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo; import com.tictactec.ta.lib.meta.annotation.OutputParameterType; public class taLib extends EvalFunc<DataBag> { private static final int MIN_ARGS = 3; public static CoreAnnotated core = new CoreAnnotated(); private static Method func_ref = null; public DecimalFormat df = new DecimalFormat("#.###"); public DataBag exec(Tuple args) throws IOException { DataBag input=null; MInteger outStart = new MInteger(); MInteger outLen = new MInteger(); Map<String,Object>outputParams=new HashMap<String, Object>(); String func_name; List<Integer> ip_colmns= new ArrayList<Integer>(); List<double[]>ip_list=new ArrayList<double[]>(); List<String>opt_type=new ArrayList<String>(); List<Object>opt_params=new ArrayList<Object>(); ////// long m1=Runtime.getRuntime().freeMemory(); System.out.println(m1); long m2=Runtime.getRuntime().totalMemory(); System.out.println(m2); ////// int ip_noofparams=0; int op_noofparams=0; int opt_noofparams=0; if (args == null || args.size() < MIN_ARGS) throw new IllegalArgumentException("talib: must have at least " + MIN_ARGS + " args"); if(args.get(0) instanceof DataBag) {input = (DataBag)args.get(0);} else{throw new IllegalArgumentException("Only a valid bag name can be passed");} // get no of fields in bag Tuple t0=input.iterator().next(); int fields_in_bag=t0.getAll().size(); if(args.get(1) instanceof String) {func_name = (String)args.get(1);} else{throw new IllegalArgumentException("Only valid function name can be passed at arg 1");} func_ref=methodChk(func_name); if (func_ref == null) { throw new IllegalArgumentException("talib: function " + func_name + " was not found"); } for (Annotation[] annotations : func_ref.getParameterAnnotations()) { for (Annotation annotation : annotations) { if(annotation instanceof InputParameterInfo) { InputParameterInfo inputParameterInfo = (InputParameterInfo)annotation; if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price)) { ip_noofparams=numberOfSetBits(inputParameterInfo.flags()); } else { ip_noofparams++; } } if(annotation instanceof OptInputParameterInfo) { OptInputParameterInfo optinputParameterInfo= (OptInputParameterInfo)annotation; opt_noofparams++; if (optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange)) { opt_type.add("Integer"); } else if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange)) { opt_type.add("Double"); } else if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList)) { opt_type.add("String"); } else{throw new IllegalArgumentException("whoopsie ...serious mess in opt_annotations");} } if (annotation instanceof OutputParameterInfo) { OutputParameterInfo outputParameterInfo = (OutputParameterInfo) annotation; op_noofparams++; if (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real)) { outputParams.put(outputParameterInfo.paramName(), new double[(int) input.size()]); } else if (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer)) { outputParams.put(outputParameterInfo.paramName(), new int[(int)input.size()]); } } } } int total_params =ip_noofparams+opt_noofparams; if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong no of argumets passed to UDF");} // get the ip colmns no's for(int i=2;i<(2+ip_noofparams);i++) { if(args.get(i) instanceof Integer ) { if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag) { ip_colmns.add((Integer) args.get(i)); } else{throw new IllegalArgumentException("The input colmn specified is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));} } else{throw new IllegalArgumentException("Wrong arguments entered: Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name ); } } // create a list of ip arrays for(int i=0;i<ip_colmns.size();i++) { ip_list.add((double[]) Array.newInstance(double.class, (int)input.size())); } int z=0; int x=0; // fill up the arrays for(Tuple t1: input) { Iterator<double[]> itr=ip_list.iterator(); z=0; while(itr.hasNext()) { if((Double)t1.get(ip_colmns.get(z)) instanceof Double) { ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++)); } else{throw new IllegalArgumentException("Illegal argument while filling up array...only double typr allowed");} } x++; } //deal with opt params int s=0; for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++) { if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString())) { if(opt_type.get(s).equalsIgnoreCase("String")) { String m=args.get(i).toString().toLowerCase(); String ma=m.substring(0, 1).toUpperCase(); String mac=m.substring(1); String macd=ma+mac; MAType type =MAType.valueOf(macd); opt_params.add(type); s++; } else{ opt_params.add(args.get(i)); s++; } } else if(opt_type.get(s).equalsIgnoreCase("Double")) { if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer")) { opt_params.add((Double)((Integer)args.get(i)+0.0)); s++; } else{throw new IllegalArgumentException("Opt arguments do not match for fn:"+func_name+", pls enter opt arguments in right order"); } } else{throw new IllegalArgumentException("Opt arguments do not match for fn:"+func_name+", pls enter opt arguments in right order");} } List<Object> ta_argl = new ArrayList<Object>(); ta_argl.add(new Integer(0)); ta_argl.add(new Integer((int)input.size() - 1)); for(double[]in: ip_list) { ta_argl.add(in); } if(opt_noofparams!=0) {ta_argl.addAll(opt_params);} ta_argl.add(outStart); ta_argl.add(outLen); for (Map.Entry<String, Object> entry : outputParams.entrySet()) { ta_argl.add(entry.getValue()); } RetCode rc = RetCode.Success; try { rc = (RetCode)func_ref.invoke(core, ta_argl.toArray()); } catch (Exception e) { assert false : "I died in ta-lib, but Java made me a zombie..."; } assert rc == RetCode.Success : "ret code from " + func_name; if (outLen.value == 0) return null; ////// DataBag ret=null; ret =outTA(input,outputParams,outStart); outputParams.clear(); ip_list.clear(); opt_params.clear(); opt_type.clear(); ip_colmns.clear(); Runtime.getRuntime().gc(); return ret; } public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart) { DataBag nbag=null; TupleFactory mTupleFactory=TupleFactory.getInstance(); BagFactory mBagFactory=BagFactory.getInstance(); nbag=mBagFactory.newDefaultBag(); Tuple tw=bag.iterator().next(); int fieldsintup=tw.getAll().size(); for(Tuple t0: bag) { Tuple t1=mTupleFactory.newTuple(); for(int z=0;z<fieldsintup;z++) { try { t1.append(t0.get(z)); } catch (Exception e) { // TODO Auto-generated catch block System.out.println("Ouch"); } } nbag.add(t1); } int i = 0; int j=0; for (Tuple t2: nbag) { if(i>=outStart.value) { for(Map.Entry<String,Object>entry: outputParams.entrySet()) { t2.append(entry.getKey().substring(3).toString()); if(entry.getValue() instanceof double[]) { t2.append( new Double (df.format(((double[])entry.getValue())[j]))); } else if(entry.getValue() instanceof int[]) { t2.append( ((int[])entry.getValue())[j]); } else{throw new IllegalArgumentException(entry.getValue().getClass()+"not supported");} } i++;j++; } else {t2.append(0.0); i++; } } return nbag; } public Method methodChk(String fn) { String fn_name=fn; Method tmp_fn=null; for (Method meth: core.getClass().getDeclaredMethods()) { if (meth.getName().equalsIgnoreCase(fn_name)) { tmp_fn = meth; break; } } return tmp_fn; } public int numberOfSetBits(int i) { i = i - ((i >> 1) & 0x55555555); i = (i & 0x33333333) + ((i >> 2) & 0x33333333); return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24; } }