Spark UnsupportedOperationException: empty collection

14,750

Solution 1

Probably it's because of some filters you (or computeRmse method) are using, so the reduce method gets called on an empty collection/RDD, so the "empty collection" is thrown. Try to double check the filters or computeRmse() function.

Solution 2

I ran into the same problem working with the same example. The problem is that the training data I was using wasn't large enough and didn't have enough repeated values. The ALS model can only predict pairs of user, product Ids that were present in the training data. (It is some what different from other machine learning algorithms in that way), so if each pair in the validation set, contains one ID which wasn't in the training set, the prediction RDD will be null (since it can't predict any of those values) and the reduce transformation in the rmse method will throw this exception. To avoid this you should:

A) not use this algorithm without sufficient training data and

B) check before entering the "finding the best model" loop that your validation set will work on the training this training set.

Lastly, if you are productizing this algorithm, make sure that you don't use the best model returned by this method, because it is likely not to have all your user and product ids. If thats the case, then you are restricting the new users and products that you can predict on. What I would recommend would be to use this logic to discern the correct training parameters and then, using those parameters, train the model on all the data and use that.

Solution 3

I came across the exact same exception. In my case, it was a bug in the code which resulted in the actual ratings RDD to be of size zero :) By passing empty ratings RDD to ALS.train I definitely deserved to get UnsupportedOperationException: empty collection

Share:
14,750
user3731845
Author by

user3731845

Updated on June 04, 2022

Comments

  • user3731845
    user3731845 almost 2 years

    Does anyone knows possible causes of this error while trying to execute spark mllib ALS using hands on lab provided by Databricks?

    14/11/20 23:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    14/11/20 23:33:39 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
    Got 27980 ratings from 24071 users on 4211 movies.
    Training: 27989, validation: 0, test: 0
    Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
        at MovieLensALS$.computeRmse(MovieLensALS.scala:149)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply$mcVI$sp(MovieLensALS.scala:95)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply$mcVD$sp(MovieLensALS.scala:93)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
        at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at MovieLensALS$$anonfun$main$1.apply$mcVI$sp(MovieLensALS.scala:93)
        at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
        at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at MovieLensALS$.main(MovieLensALS.scala:93)
        at MovieLensALS.main(MovieLensALS.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    

    UPDATE: Sure thing! I am using this class. It is available in https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html and https://databricks-training.s3.amazonaws.com/getting-started.html#additional-required-download. Let me know if there is somehting else that could help

    import java.io.File
    
    import scala.io.Source
    
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.rdd._`enter code here`
    import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
    
    object MovieLensALS {
    
      def main(args: Array[String]) {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
        if (args.length != 2) {
          println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class MovieLensALS " +
            "target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
          sys.exit(1)
        }
    
        // set up environment
    
        val conf = new SparkConf()
          .setAppName("MovieLensALS")
          .set("spark.executor.memory", "2g")
        val sc = new SparkContext(conf)
    
        // load personal ratings
    
        val myRatings = loadRatings(args(1))
        val myRatingsRDD = sc.parallelize(myRatings, 1)
    
        // load ratings and movie titles
    
        val movieLensHomeDir = args(0)
    
        val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
          val fields = line.split("::")
          // format: (timestamp % 10, Rating(userId, movieId, rating))
          (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
        }
    
        val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
          val fields = line.split("::")
          // format: (movieId, movieName)
          (fields(0).toInt, fields(1))
        }.collect().toMap
    
        val numRatings = ratings.count()
        val numUsers = ratings.map(_._2.user).distinct().count()
        val numMovies = ratings.map(_._2.product).distinct().count()
    
        println("Got " + numRatings + " ratings from "
          + numUsers + " users on " + numMovies + " movies.")
    
        // split ratings into train (60%), validation (20%), and test (20%) based on the 
        // last digit of the timestamp, add myRatings to train, and cache them
    
        val numPartitions = 4
        val training = ratings.filter(x => x._1 < 6)
          .values
          .union(myRatingsRDD)
          .repartition(numPartitions)
          .cache()
        val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
          .values
          .repartition(numPartitions)
          .cache()
        val test = ratings.filter(x => x._1 >= 8).values.cache()
    
        val numTraining = training.count()
        val numValidation = validation.count()
        val numTest = test.count()
    
        println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
    
        // train models and evaluate them on the validation set
    
        val ranks = List(8, 12)
        val lambdas = List(0.1, 10.0)
        val numIters = List(10, 20)
        var bestModel: Option[MatrixFactorizationModel] = None
        var bestValidationRmse = Double.MaxValue
        var bestRank = 0
        var bestLambda = -1.0
        var bestNumIter = -1
        for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
          val model = ALS.train(training, rank, numIter, lambda)
          val validationRmse = computeRmse(model, validation, numValidation)
          println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " 
            + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
          if (validationRmse < bestValidationRmse) {
            bestModel = Some(model)
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lambda
            bestNumIter = numIter
          }
        }
    
        // evaluate the best model on the test set
    
        val testRmse = computeRmse(bestModel.get, test, numTest)
    
        println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
          + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
    
        // create a naive baseline and compare it with the best model
    
        val meanRating = training.union(validation).map(_.rating).mean
        val baselineRmse = 
          math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
        val improvement = (baselineRmse - testRmse) / baselineRmse * 100
        println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
    
        // make personalized recommendations
    
        val myRatedMovieIds = myRatings.map(_.product).toSet
        val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
        val recommendations = bestModel.get
          .predict(candidates.map((0, _)))
          .collect()
          .sortBy(- _.rating)
          .take(50)
    
        var i = 1
        println("Movies recommended for you:")
        recommendations.foreach { r =>
          println("%2d".format(i) + ": " + movies(r.product))
          i += 1
        }
    
        // clean up
        sc.stop()
      }
    
      /** Compute RMSE (Root Mean Squared Error). */
      def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
        val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
        val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
          .join(data.map(x => ((x.user, x.product), x.rating)))
          .values
        math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
      }
    
      /** Load ratings from file. */
      def loadRatings(path: String): Seq[Rating] = {
        val lines = Source.fromFile(path).getLines()
        val ratings = lines.map { line =>
          val fields = line.split("::")
          Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
        }.filter(_.rating > 0.0)
        if (ratings.isEmpty) {
          sys.error("No ratings provided.")
        } else {
          ratings.toSeq
        }
      }
    }
    
  • Alind Billore
    Alind Billore almost 8 years
    Thank's this was the problem with my training dataset... Some of the LIBSVN format files were having only 1 or two records i guess that is what which caused this error!