How to use Spark's .newAPIHadoopRDD() from Java

10,365

Finally I got it resolved after much fight. Problem is newHadoopAPI requires a class which extends org.apache.hadoop.mapreduce.InputFormat and org.apache.cassandra.hadoop.cql3.CqlInputFormat does not extend InputFormat directly, instead it extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat which in turn extends InputFormat.

Eclipse uses groovy compiler which is smart enough to resolve this but Java's default compiler is unable to resolve this. Also Groovy compiler resolves K,V values properly which java compiler finds incompatible.

You need to add following changes to pom.xml file to use groovy compiler:

<properties>
    <groovy-version>1.8.6</groovy-version>
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version>
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version>
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version>
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version>
</properties>
  1. Add groovy as a dependency

    <dependencies>
        <dependency>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy-all</artifactId>
            <version>${groovy-version}</version>
        </dependency>
    <dependencies>
    
  2. Add grovvy plugin under build to use it as compiler for our code

    <build>
        <pluginManagement>
            <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-comipler-plugin-version}</version>
                <configuration>
                    <!-- Bind Groovy Eclipse Compiler -->
                    <compilerId>groovy-eclipse-compiler</compilerId>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                </configuration>
                <dependencies>
                    <!-- Define which Groovy version will be used for build (default is 
                        2.0) -->
                    <dependency>
                        <groupId>org.codehaus.groovy</groupId>
                        <artifactId>groovy-eclipse-batch</artifactId>
                        <version>${groovy-eclipse-batch-version}</version>
                    </dependency>
                    <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
                        in compilerId) -->
                    <dependency>
                        <groupId>org.codehaus.groovy</groupId>
                        <artifactId>groovy-eclipse-compiler</artifactId>
                        <version>${groovy-eclipse-compiler-version}</version>
                    </dependency>
                </dependencies>
            </plugin>
            <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
                to this, plugin will -->
            <!-- enhance default build life cycle with an extra phase which adds 
                additional Groovy source folders -->
            <!-- It works fine under Maven 3.x, but we've encountered problems with 
                Maven 2.x -->
            <plugin>
                <groupId>org.codehaus.groovy</groupId>
                <artifactId>groovy-eclipse-compiler</artifactId>
                <version>${groovy-eclipse-compiler-version}</version>
                <extensions>true</extensions>
            </plugin>
            <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
                to any execution phase, --> 
            <!-- so you'll have to call Clover goals from command line. -->
            <plugin>
                <groupId>com.atlassian.maven.plugins</groupId>
                <artifactId>maven-clover2-plugin</artifactId>
                <version>${maven-clover2-plugin-version}</version>
                <configuration>
                    <generateHtml>true</generateHtml>
                    <historyDir>.cloverhistory</historyDir>
                </configuration>
            </plugin>
            </plugins>
        </pluginManagement>
    </build>
    

This should solve it.

Share:
10,365
martingms
Author by

martingms

Updated on June 04, 2022

Comments

  • martingms
    martingms almost 2 years

    I am trying to port an example written in Scala (from the Apache Spark project) into Java, and running into some issues.

    The code

    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
      classOf[CqlPagingInputFormat],
      classOf[java.util.Map[String,ByteBuffer]],
      classOf[java.util.Map[String,ByteBuffer]])
    

    from the original Scala example builds and runs just fine, but

    JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(),
      CqlPagingInputFormat.class,
      java.util.Map<String, ByteBuffer>.class,
      java.util.Map<String, ByteBuffer>.class);
    

    is not allowed in Java (Cannot select from parameterized type).

    Changing

    java.util.Map<String, ByteBuffer>.class
    

    into

    Class.forName("java.util.Map<String, ByteBuffer>")
    

    yields a new error:

    Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
    required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
    found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?>
    reason: inferred type does not conform to declared bound(s)
    inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
    bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?>
    

    Changing it into simply java.util.Map.class yields a similar error:

    Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
    required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
    found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map>
    reason: inferred type does not conform to declared bound(s)
    inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
    bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map>
    

    So what is the correct translation? Worth noting that the newAPIHadoopRDD() function is a different implementation for Scala and for Java. Documentation for the methods can be found here for Scala and here: http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration, java.lang.Class, java.lang.Class, java.lang.Class) for Java.

    The declaration of CqlPagingInputFormat looks like this

    public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> {