How to use Spark's .newAPIHadoopRDD() from Java
Finally I got it resolved after much fight. Problem is newHadoopAPI requires a class which extends org.apache.hadoop.mapreduce.InputFormat and org.apache.cassandra.hadoop.cql3.CqlInputFormat does not extend InputFormat directly, instead it extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat which in turn extends InputFormat.
Eclipse uses groovy compiler which is smart enough to resolve this but Java's default compiler is unable to resolve this. Also Groovy compiler resolves K,V values properly which java compiler finds incompatible.
You need to add following changes to pom.xml file to use groovy compiler:
<properties>
<groovy-version>1.8.6</groovy-version>
<maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version>
<groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version>
<maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version>
<groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version>
</properties>
Add groovy as a dependency
<dependencies> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>${groovy-version}</version> </dependency> <dependencies>
Add grovvy plugin under build to use it as compiler for our code
<build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-comipler-plugin-version}</version> <configuration> <!-- Bind Groovy Eclipse Compiler --> <compilerId>groovy-eclipse-compiler</compilerId> <source>${jdk-version}</source> <target>${jdk-version}</target> </configuration> <dependencies> <!-- Define which Groovy version will be used for build (default is 2.0) --> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-eclipse-batch</artifactId> <version>${groovy-eclipse-batch-version}</version> </dependency> <!-- Define dependency to Groovy Eclipse Compiler (as it's referred in compilerId) --> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-eclipse-compiler</artifactId> <version>${groovy-eclipse-compiler-version}</version> </dependency> </dependencies> </plugin> <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks to this, plugin will --> <!-- enhance default build life cycle with an extra phase which adds additional Groovy source folders --> <!-- It works fine under Maven 3.x, but we've encountered problems with Maven 2.x --> <plugin> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-eclipse-compiler</artifactId> <version>${groovy-eclipse-compiler-version}</version> <extensions>true</extensions> </plugin> <!-- Configure Clover for Maven plug-in. Please note that it's not bound to any execution phase, --> <!-- so you'll have to call Clover goals from command line. --> <plugin> <groupId>com.atlassian.maven.plugins</groupId> <artifactId>maven-clover2-plugin</artifactId> <version>${maven-clover2-plugin-version}</version> <configuration> <generateHtml>true</generateHtml> <historyDir>.cloverhistory</historyDir> </configuration> </plugin> </plugins> </pluginManagement> </build>
This should solve it.
martingms
Updated on June 04, 2022Comments
-
martingms almost 2 years
I am trying to port an example written in Scala (from the Apache Spark project) into Java, and running into some issues.
The code
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]])
from the original Scala example builds and runs just fine, but
JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(), CqlPagingInputFormat.class, java.util.Map<String, ByteBuffer>.class, java.util.Map<String, ByteBuffer>.class);
is not allowed in Java (
Cannot select from parameterized type
).Changing
java.util.Map<String, ByteBuffer>.class
into
Class.forName("java.util.Map<String, ByteBuffer>")
yields a new error:
Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?> reason: inferred type does not conform to declared bound(s) inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?>
Changing it into simply
java.util.Map.class
yields a similar error:Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map> reason: inferred type does not conform to declared bound(s) inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map>
So what is the correct translation? Worth noting that the
newAPIHadoopRDD()
function is a different implementation for Scala and for Java. Documentation for the methods can be found here for Scala and here: http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration, java.lang.Class, java.lang.Class, java.lang.Class) for Java.The declaration of
CqlPagingInputFormat
looks like thispublic class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> {