Addition of two RDD[mllib.linalg.Vector]'s

11,587

Solution 1

This is actually a good question. I work with mllib regularly and did not realize these basic linear algebra operations are not easily accessible.

The point is that the underlying breeze vectors have all of the linear algebra manipulations you would expect - including of course basic element wise addition that you specifically mentioned.

However the breeze implementation is hidden from the outside world via:

[private mllib]

So then, from the outside world/public API perspective, how do we access those primitives?

Some of them are already exposed: e.g. sum of squares:

/**
 * Returns the squared distance between two Vectors.
 * @param v1 first Vector.
 * @param v2 second Vector.
 * @return squared distance between two Vectors.
 */
def sqdist(v1: Vector, v2: Vector): Double = { 
  ...
}

However the selection of such available methods is limited - and in fact does not include the basic operations including element wise addition, subtraction, multiplication, etc.

So here is the best I could see:

  • Convert the vectors to breeze:
  • Perform the vector operations in breeze
  • Convert back from breeze to mllib Vector

Here is some sample code:

val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)

val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]

Solution 2

The following code exposes asBreeze and fromBreeze methods from Spark. This solution supports SparseVector in contrast to using vector.toArray. Note that Spark may change their API in the future and already has renamed toBreeze to asBreeze.

package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf

/** expose vector.toBreeze and Vectors.fromBreeze
  */
object VectorUtils {

  def fromBreeze(breezeVector: BV[Double]): Vector = {
    Vectors.fromBreeze( breezeVector )
  }

  def asBreeze(vector: Vector): BV[Double] = {
    // this is vector.asBreeze in Spark 2.0
    vector.toBreeze
  }

  val addVectors = udf {
    (v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
  }

}

With this you can do df.withColumn("xy", addVectors($"x", $"y")).

Share:
11,587

Related videos on Youtube

krishna
Author by

krishna

Updated on September 14, 2022

Comments

  • krishna
    krishna over 1 year

    I need addition of two matrices that are stored in two files.

    The content of latest1.txt and latest2.txt has the next str:

    1 2 3
    4 5 6
    7 8 9
    

    I am reading those files as follows:

    scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
        Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
    }
    
    scala> val r1 = rows
    r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
    
    scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
        Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
    }
    
    scala> val r2 = rows
    r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
    

    I want to add r1, r2. So, Is there any way to add this two RDD[mllib.linalg.Vector]s in Apache-Spark.

  • Shyamendra Solanki
    Shyamendra Solanki about 9 years
    Yes. MLlib is not a complete linear algebra library, Breeze should be used if such operations are needed.
  • hidemyname
    hidemyname over 8 years
    But what if the vector is sparse. I am currently manipulating the sparse vector. But if using your way to convert the vector, it will cost much more memory and slow down the calculating speed. It is weird that pyspark can do this operation easily. So I am thinking using python instead.
  • Stefan Falk
    Stefan Falk about 8 years
    This is actually what I am trying.. but what am I doing wrong here?
  • WestCoastProjects
    WestCoastProjects about 8 years
    @displayname I answered that question.
  • WestCoastProjects
    WestCoastProjects over 6 years
    @HahaTTpro did you import org.apache.spark.ml.linalg.DenseVector ?
  • Sai Kiriti Badam
    Sai Kiriti Badam about 6 years
    @javadba How much do you think the performance will be affected when dealing with Sparse vectors? I'm dealing with Spark vectors of length 2**20 and I can't seem to find an efficient way to deal with this in Scala.
  • Scott H
    Scott H almost 6 years
    Shouldn't the first line be import org.apache.spark.mllib.linalg._ instead of a package definition? If I use as is I get an error saying "illegal start of definition".
  • Jussi Kujala
    Jussi Kujala over 5 years
    @scottH no, because functions need to be part of the package to get access to private functions. The code worked fine in Spark 1.6.1, but Spark 2+ has changed things. Did you try to compile the code into JAR instead of copy pasting to spark-shell?