Pig JVM java heap space error

13,958

Probably a problem with the BZip codec - the API does note that it's rather memory hungry:

When you increased the memory with -Xms2048m did you set the options for the pig grunt shell, or for the map/reduce jobs?
set mapred.child.java.opts=-Xmx2048m

You can check by looking in the JobTracker, find the job that failed, open the job.xml and locate the value of mapred.child.java.opts

Share:
13,958
user1426777
Author by

user1426777

Updated on June 04, 2022

Comments

  • user1426777
    user1426777 almost 2 years

    I m trying to run a pig script which is calling a User Defined Function written in java.I m trying to test this script with a very small file of 264Bytes. I end up getting java heap space errors and the job fails. I have tried running the job with the -Xms1024M option, it runs for the smaller files but fails with a larger file. And even then my cluster is powerful enough to not trip over such small files, I wonder how i can fix this memory leak. Can someone pls help,

    import java.util.HashMap;
    import java.lang.annotation.Annotation;
    import java.lang.reflect.Array;
    import java.lang.reflect.Method;
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.util.Set;
    import java.text.*;
    
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.*;
    
    
    import com.tictactec.ta.lib.CoreAnnotated;
    import com.tictactec.ta.lib.MAType;
    import com.tictactec.ta.lib.MInteger;
    import com.tictactec.ta.lib.RetCode;
    import com.tictactec.ta.lib.meta.annotation.InputParameterInfo;
    import com.tictactec.ta.lib.meta.annotation.InputParameterType;
    import com.tictactec.ta.lib.meta.annotation.OptInputParameterInfo;
    import com.tictactec.ta.lib.meta.annotation.OptInputParameterType;
    import com.tictactec.ta.lib.meta.annotation.OutputParameterInfo;
    import com.tictactec.ta.lib.meta.annotation.OutputParameterType;
    
    public class taLib extends EvalFunc<DataBag> 
    {
    
        private static final int MIN_ARGS = 3;
    
        public static CoreAnnotated core = new CoreAnnotated();
        private static Method func_ref = null;
    
        public DecimalFormat df = new DecimalFormat("#.###");
    
    
        public DataBag exec(Tuple args) throws IOException 
        {
    
            DataBag input=null;
            MInteger outStart = new MInteger();
            MInteger outLen = new MInteger();
            Map<String,Object>outputParams=new HashMap<String, Object>();
            String func_name;
            List<Integer> ip_colmns= new ArrayList<Integer>();
            List<double[]>ip_list=new ArrayList<double[]>();
            List<String>opt_type=new ArrayList<String>();
            List<Object>opt_params=new ArrayList<Object>();
            //////
    
            long m1=Runtime.getRuntime().freeMemory();
            System.out.println(m1);
            long m2=Runtime.getRuntime().totalMemory();
            System.out.println(m2);
            //////
            int ip_noofparams=0;
            int op_noofparams=0;
            int opt_noofparams=0;
    
            if (args == null || args.size() < MIN_ARGS)
                throw new IllegalArgumentException("talib: must have at least " +   
    MIN_ARGS + "  args");
    
            if(args.get(0) instanceof DataBag)
            {input = (DataBag)args.get(0);}
            else{throw new IllegalArgumentException("Only a valid bag name can be 
    passed");}
    
            // get no of fields in bag
            Tuple t0=input.iterator().next();
            int fields_in_bag=t0.getAll().size();
    
            if(args.get(1) instanceof String)
            {func_name = (String)args.get(1);}
            else{throw new IllegalArgumentException("Only valid function name can be 
    passed at arg 1");}
            func_ref=methodChk(func_name);
    
            if (func_ref == null) {
                throw new IllegalArgumentException("talib: function " 
                                   + func_name + " was not found");
            }
    
            for (Annotation[] annotations : func_ref.getParameterAnnotations()) 
                 {
                  for (Annotation annotation : annotations) 
                   {
                    if(annotation instanceof InputParameterInfo)
                    {
                     InputParameterInfo inputParameterInfo = 
    (InputParameterInfo)annotation;
    
    
    if(inputParameterInfo.type().equals(InputParameterType.TA_Input_Price))
                       {
    
    ip_noofparams=numberOfSetBits(inputParameterInfo.flags());
                       }
                        else
                        {
                        ip_noofparams++;
                        }
                    }
                    if(annotation instanceof OptInputParameterInfo)
                    {
                        OptInputParameterInfo optinputParameterInfo=
    (OptInputParameterInfo)annotation;
                        opt_noofparams++;
                        if 
    (optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerRange))
                        {
                            opt_type.add("Integer");
                        }
                        else 
     if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_RealRange))
                        {
                            opt_type.add("Double");
                        }
                        else  
     if(optinputParameterInfo.type().equals(OptInputParameterType.TA_OptInput_IntegerList))
                        {
                            opt_type.add("String");
                        }
                        else{throw new IllegalArgumentException("whoopsie ...serious 
     mess in opt_annotations");}
    
                    }
                    if (annotation instanceof OutputParameterInfo) 
                    {
                                OutputParameterInfo outputParameterInfo = 
     (OutputParameterInfo) annotation;
                                op_noofparams++;
                        if 
     (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Real)) 
                        {
                                  outputParams.put(outputParameterInfo.paramName(), new 
     double[(int) input.size()]);
                        }
                        else if    
     (outputParameterInfo.type().equals(OutputParameterType.TA_Output_Integer)) 
                        {
                                  outputParams.put(outputParameterInfo.paramName(), new  
      int[(int)input.size()]);
                                }
                    } 
                   }
                }
    
            int total_params =ip_noofparams+opt_noofparams;
            if((args.size()-2)!=total_params){throw new IllegalArgumentException("Wrong   
     no of argumets passed to UDF");}
    
    
             // get the ip colmns no's 
            for(int i=2;i<(2+ip_noofparams);i++)
            {   
               if(args.get(i) instanceof Integer )
                 {
                  if((Integer)args.get(i)>=0 && (Integer)args.get(i)<fields_in_bag)
                {
                ip_colmns.add((Integer) args.get(i));
                }
                else{throw new IllegalArgumentException("The input colmn specified 
     is invalid..please enter a valid colmn no:0-"+(fields_in_bag-1));}     
                 }  
                   else{throw new IllegalArgumentException("Wrong arguments entered: 
     Only"+ip_noofparams+"field no's of type(integer) allowed for fn"+func_name ); }
    
             }
    
            // create a list of ip arrays 
            for(int i=0;i<ip_colmns.size();i++)
            {
             ip_list.add((double[]) Array.newInstance(double.class, (int)input.size()));
            }
            int z=0;
            int x=0;
            // fill up the arrays
            for(Tuple t1: input)
            {       
    
                Iterator<double[]> itr=ip_list.iterator();
                z=0;
                while(itr.hasNext())
                {
    
                 if((Double)t1.get(ip_colmns.get(z)) instanceof Double)
                 {
                  ((double[])itr.next())[x]=(Double) t1.get(ip_colmns.get(z++));
                 }
                 else{throw new IllegalArgumentException("Illegal argument while 
     filling up array...only double typr allowed");}
                }
                x++;
            }
    
            //deal with opt params
            int s=0;
            for(int i=(2+ip_noofparams);i<(2+ip_noofparams+opt_noofparams);i++)
            {
    
    
    
     if(opt_type.get(s).equalsIgnoreCase(args.get(i).getClass().getSimpleName().toString()))
                {       
                    if(opt_type.get(s).equalsIgnoreCase("String"))
                    {
                        String m=args.get(i).toString().toLowerCase();
                        String ma=m.substring(0, 1).toUpperCase();
                        String mac=m.substring(1);
                        String macd=ma+mac;
                        MAType type =MAType.valueOf(macd);
                        opt_params.add(type);
                        s++;
                    }
    
                    else{
                        opt_params.add(args.get(i));
                        s++;
                        }
    
                }
                else if(opt_type.get(s).equalsIgnoreCase("Double"))
                        {
    
    
     if(args.get(i).getClass().getSimpleName().toString().equalsIgnoreCase("Integer"))
                         {
                        opt_params.add((Double)((Integer)args.get(i)+0.0));
                        s++;
                         } 
                      else{throw new IllegalArgumentException("Opt arguments do   
     not match for fn:"+func_name+", pls enter opt arguments in right order"); }
                        }
                else{throw new IllegalArgumentException("Opt arguments do not match 
     for fn:"+func_name+", pls enter opt arguments in right order");}
    
            } 
    
    
            List<Object> ta_argl = new ArrayList<Object>();
            ta_argl.add(new Integer(0));
            ta_argl.add(new Integer((int)input.size() - 1));
            for(double[]in: ip_list)
            {
             ta_argl.add(in);
            }
    
            if(opt_noofparams!=0)
            {ta_argl.addAll(opt_params);}
            ta_argl.add(outStart);
            ta_argl.add(outLen);
    
            for (Map.Entry<String, Object> entry : outputParams.entrySet()) 
                {
                 ta_argl.add(entry.getValue());
                }
    
    
                RetCode rc = RetCode.Success;
            try {
                rc = (RetCode)func_ref.invoke(core, ta_argl.toArray());
                } catch (Exception e) 
                        {
                assert false : "I died in ta-lib, but Java made me a zombie...";
                }
    
            assert rc == RetCode.Success : "ret code from " + func_name;
    
    
    
            if (outLen.value == 0) return null;
    
            //////
            DataBag ret=null;
            ret =outTA(input,outputParams,outStart);
            outputParams.clear();
            ip_list.clear();
            opt_params.clear();
            opt_type.clear();
            ip_colmns.clear();
            Runtime.getRuntime().gc();
            return ret;
    
        }
    
    
    
    
    
        public DataBag outTA(DataBag bag,Map<String, Object> outputParams,MInteger outStart)
        {
            DataBag nbag=null;
            TupleFactory mTupleFactory=TupleFactory.getInstance();
            BagFactory mBagFactory=BagFactory.getInstance();
            nbag=mBagFactory.newDefaultBag();
            Tuple tw=bag.iterator().next();
            int fieldsintup=tw.getAll().size();
    
    
            for(Tuple t0: bag)
            {
                Tuple t1=mTupleFactory.newTuple();
    
                for(int z=0;z<fieldsintup;z++)
                {
                    try {
                        t1.append(t0.get(z));
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        System.out.println("Ouch");
                    }
                }
                nbag.add(t1);
            }
    
            int i = 0;
            int j=0;        
            for (Tuple t2: nbag) 
            {
             if(i>=outStart.value)
              {
                for(Map.Entry<String,Object>entry: outputParams.entrySet())
                {
                t2.append(entry.getKey().substring(3).toString());
    
                 if(entry.getValue() instanceof double[])
                  {
                   t2.append( new Double 
    (df.format(((double[])entry.getValue())[j])));
                  }
                 else if(entry.getValue() instanceof int[])
                  {
                   t2.append( ((int[])entry.getValue())[j]);
                  }
                 else{throw new 
    IllegalArgumentException(entry.getValue().getClass()+"not supported");}
                }       
                i++;j++;
              }
              else
               {t2.append(0.0);
                i++;    
                   }
    
            }
    
            return nbag;
        }
    
        public Method methodChk(String fn)
        {
            String fn_name=fn;
            Method tmp_fn=null;
            for (Method meth: core.getClass().getDeclaredMethods()) 
         {
          if (meth.getName().equalsIgnoreCase(fn_name)) 
           {
            tmp_fn = meth;
            break;
           }
         }
            return tmp_fn;
        }
    
    
        public int numberOfSetBits(int i) {
            i = i - ((i >> 1) & 0x55555555);
            i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
            return ((i + (i >> 4) & 0xF0F0F0F) * 0x1010101) >> 24;
        }
    
    }