Spark: Efficient mass lookup in pair RDD's


Your lookup 1 doesn't work because you cannot perform RDD transformations inside workers (inside another transformation).

In the lookup 2, I don't think it's necessary to perform full cartesian...

You can do it like this:

val firstjoin ={case (k1,k2) => (k1, (k1,k2))})
    .map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)})
val result ={case ((k1,k2),v1) => (k2, ((k1,k2),v1))})
    .map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))})

Or in a more dense form:

    val firstjoin = => (x._1, x)).join(data).map(_._2)
    val result ={case (x,y) => (x._2, (x,y))})
        .join(data).map({case(x, (y, z))=>(y._1, (y._2, z))})

I don't think you can do it more efficiently, but I might be wrong...

Updated on June 17, 2022


  Tobber
    Tobber almost 2 years

    In Apache Spark I have two RDD's. The first data : RDD[(K,V)] containing data in key-value form. The second pairs : RDD[(K,K)] contains a set of interesting key-pairs of this data.

    How can I efficiently construct an RDD pairsWithData : RDD[((K,K)),(V,V))], such that it contains all the elements from pairs as the key-tuple and their corresponding values (from data) as the value-tuple?

    Some properties of the data:

    • The keys in data are unique
    • All entries in pairs are unique
    • For all pairs (k1,k2) in pairs it is guaranteed that k1 <= k2
    • The size of 'pairs' is only a constant the size of data |pairs| = O(|data|)
    • Current data sizes (expected to grow): |data| ~ 10^8, |pairs| ~ 10^10

    Current attempts

    Here is some example code in Scala:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.SparkContext._
    // This kind of show the idea, but fails at runtime.
    def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
      keyPairs map {case (k1,k2) =>
        val v1 : String = data lookup k1 head;
        val v2 : String = data lookup k2 head;
        ((k1, k2), (v1,v2))
    // Works but is O(|data|^2)
    def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = {
      // Construct all possible pairs of values
      val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))}
      // Select only the values who's keys are in keyPairs
      keyPairs map {(_,0)} join cartesianData mapValues {_._2}
    // Example function that find pairs of keys
    // Runs in O(|data|) in real life, but cannot maintain the values
    def relevantPairs(data : RDD[(Int, String)]) = {
      val keys = data map (_._1)
      keys cartesian keys filter {case (x,y) => x*y == 12 && x < y}
    // Example run
    val data = sc parallelize(1 to 12) map (x => (x, "Number " + x))
    val pairs = relevantPairs(data)
    val pairsWithData = massPairLookup2(pairs, data) 
    // Print: 
    // ((1,12),(Number1,Number12))
    // ((2,6),(Number2,Number6))
    // ((3,4),(Number3,Number4))

    Attempt 1

    First I tried just using the lookup function on data, but that throws an runtime error when executed. It seems like self is null in the PairRDDFunctions trait.

    In addition I am not sure about the performance of lookup. The documentation says This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to. This sounds like n lookups takes O(n*|partition|) time at best, which I suspect could be optimized.

    Attempt 2

    This attempt works, but I create |data|^2 pairs which will kill performance. I do not expect Spark to be able to optimize that away.

  • Tobber
    Tobber over 9 years
    Works great. I did a quick local benchmark. with |pairs| = 144 244 and |data|=10 000 the run time was 2.91 s for this and 980.45 s for massPairLookup2. See the code at this gist
  • Daniel Darabos
    Daniel Darabos over 9 years
    I've changed the code a bit. I think it's easier to read like this. Can you please check to make sure I did not mess it up?
  • pzecevic
    pzecevic over 9 years
    You are right, it wasn't very readable. Thank you. But I think you made a mistake there... I'll fix it.