Spark: Efficient mass lookup in pair RDD's
Your lookup 1 doesn't work because you cannot perform RDD transformations inside workers (inside another transformation).
In the lookup 2, I don't think it's necessary to perform full cartesian...
You can do it like this:
val firstjoin = pairs.map({case (k1,k2) => (k1, (k1,k2))})
.join(data)
.map({case (_, ((k1, k2), v1)) => ((k1, k2), v1)})
val result = firstjoin.map({case ((k1,k2),v1) => (k2, ((k1,k2),v1))})
.join(data)
.map({case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2))})
Or in a more dense form:
val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2)
val result = firstjoin.map({case (x,y) => (x._2, (x,y))})
.join(data).map({case(x, (y, z))=>(y._1, (y._2, z))})
I don't think you can do it more efficiently, but I might be wrong...
Tobber
Updated on June 17, 2022Comments
-
Tobber almost 2 years
In Apache Spark I have two RDD's. The first
data : RDD[(K,V)]
containing data in key-value form. The secondpairs : RDD[(K,K)]
contains a set of interesting key-pairs of this data.How can I efficiently construct an RDD
pairsWithData : RDD[((K,K)),(V,V))]
, such that it contains all the elements frompairs
as the key-tuple and their corresponding values (fromdata
) as the value-tuple?Some properties of the data:
- The keys in
data
are unique - All entries in
pairs
are unique - For all pairs
(k1,k2)
inpairs
it is guaranteed thatk1 <= k2
- The size of 'pairs' is only a constant the size of data
|pairs| = O(|data|)
- Current data sizes (expected to grow):
|data| ~ 10^8, |pairs| ~ 10^10
Current attempts
Here is some example code in Scala:
import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ // This kind of show the idea, but fails at runtime. def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { keyPairs map {case (k1,k2) => val v1 : String = data lookup k1 head; val v2 : String = data lookup k2 head; ((k1, k2), (v1,v2)) } } // Works but is O(|data|^2) def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = { // Construct all possible pairs of values val cartesianData = data cartesian data map {case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))} // Select only the values who's keys are in keyPairs keyPairs map {(_,0)} join cartesianData mapValues {_._2} } // Example function that find pairs of keys // Runs in O(|data|) in real life, but cannot maintain the values def relevantPairs(data : RDD[(Int, String)]) = { val keys = data map (_._1) keys cartesian keys filter {case (x,y) => x*y == 12 && x < y} } // Example run val data = sc parallelize(1 to 12) map (x => (x, "Number " + x)) val pairs = relevantPairs(data) val pairsWithData = massPairLookup2(pairs, data) // Print: // ((1,12),(Number1,Number12)) // ((2,6),(Number2,Number6)) // ((3,4),(Number3,Number4)) pairsWithData.foreach(println)
Attempt 1
First I tried just using the
lookup
function ondata
, but that throws an runtime error when executed. It seems likeself
is null in thePairRDDFunctions
trait.In addition I am not sure about the performance of
lookup
. The documentation says This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to. This sounds liken
lookups takes O(n*|partition|) time at best, which I suspect could be optimized.Attempt 2
This attempt works, but I create
|data|^2
pairs which will kill performance. I do not expect Spark to be able to optimize that away. - The keys in
-
Tobber over 9 yearsWorks great. I did a quick local benchmark. with
|pairs| = 144 244
and|data|=10 000
the run time was 2.91 s for this and 980.45 s formassPairLookup2
. See the code at this gist -
Daniel Darabos over 9 yearsI've changed the code a bit. I think it's easier to read like this. Can you please check to make sure I did not mess it up?
-
pzecevic over 9 yearsYou are right, it wasn't very readable. Thank you. But I think you made a mistake there... I'll fix it.