Apache Spark: map vs mapPartitions?

152,113

Solution 1

What's the difference between an RDD's map and mapPartitions method?

The method map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into multiple elements of the result (possibly none).

And does flatMap behave like map or like mapPartitions?

Neither, flatMap works on a single element (as map) and produces multiple elements of the result (as mapPartitions).

Solution 2

Imp. TIP :

Whenever you have heavyweight initialization that should be done once for many RDD elements rather than once per RDD element, and if this initialization, such as creation of objects from a third-party library, cannot be serialized (so that Spark can transmit it across the cluster to the worker nodes), use mapPartitions() instead of map(). mapPartitions() provides for the initialization to be done once per worker task/thread/partition instead of once per RDD data element for example : see below.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. does flatMap behave like map or like mapPartitions?

Yes. please see example 2 of flatmap.. its self explanatory.

Q1. What's the difference between an RDD's map and mapPartitions

map works the function being utilized at a per element level while mapPartitions exercises the function at the partition level.

Example Scenario : if we have 100K elements in a particular RDD partition then we will fire off the function being used by the mapping transformation 100K times when we use map.

Conversely, if we use mapPartitions then we will only call the particular function one time, but we will pass in all 100K records and get back all responses in one function call.

There will be performance gain since map works on a particular function so many times, especially if the function is doing something expensive each time that it wouldn't need to do if we passed in all the elements at once(in case of mappartitions).

map

Applies a transformation function on each item of the RDD and returns the result as a new RDD.

Listing Variants

def map[U: ClassTag](f: T => U): RDD[U]

Example :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.

Listing Variants

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Example 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Example 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

The above program can also be written using flatMap as follows.

Example 2 using flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusion :

mapPartitions transformation is faster than map since it calls your function once/partition, not once/element..

Further reading : foreach Vs foreachPartitions When to use What?

Solution 3

Map :

  1. It processes one row at a time , very similar to map() method of MapReduce.
  2. You return from the transformation after every row.

MapPartitions

  1. It processes the complete partition in one go.
  2. You can return from the function only once after processing the whole partition.
  3. All intermediate results needs to be held in memory till you process the whole partition.
  4. Provides you like setup() map() and cleanup() function of MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

Share:
152,113

Related videos on Youtube

Nicholas White
Author by

Nicholas White

Updated on February 23, 2021

Comments

  • Nicholas White
    Nicholas White about 3 years

    What's the difference between an RDD's map and mapPartitions method? And does flatMap behave like map or like mapPartitions? Thanks.

    (edit) i.e. what is the difference (either semantically or in terms of execution) between

      def map[A, B](rdd: RDD[A], fn: (A => B))
                   (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
        rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
          preservesPartitioning = true)
      }
    

    And:

      def map[A, B](rdd: RDD[A], fn: (A => B))
                   (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
        rdd.map(fn)
      }
    
  • Nicholas White
    Nicholas White over 10 years
    Thanks - so does map cause shuffles (or otherwise change the number of partitions)? Does it move data between nodes? I've been using mapPartitions to avoid moving data between nodes, but wasn't sure if flapMap would do so.
  • Alexey Romanov
    Alexey Romanov over 10 years
    If you look at the source -- github.com/apache/incubator-spark/blob/… and github.com/apache/incubator-spark/blob/… -- both map and flatMap have exactly the same partitions as the parent.
  • Mikel Urkia
    Mikel Urkia over 9 years
    As a note, a presentation provided by a speaker at the 2013 San Francisco Spark Summit (goo.gl/JZXDCR) highlights that tasks with high per-record overhead perform better with a mapPartition than with a map transformation. This is, according to the presentation, due to the high cost of setting up a new task.
  • Daniel Langdon
    Daniel Langdon over 9 years
    That is indeed the case, if you can indeed do setup a single time for the entire partition. The example they give is the need to open a DB connection for instance (no need to do it on every record)
  • Daniel Langdon
    Daniel Langdon over 9 years
    That said, not sure if there is a difference in parallel execution and memory usage between map and mapPartitions. For instance, map could work in parallel implicitly, mapPartitions forces you to iterate. Thus computation could be faster with map but if your execution on a single tuple uses a lot of temporary memory, mapPartitions could avoid GC and memory issues. No idea if this is the way it actually works, but my anecdotal evidence seems to imply this. Would love to have confirmation.
  • Bob
    Bob about 9 years
    I'm seeing the opposite -- even with very small operations, its faster to call mapPartitions and iterate than call map. I am assuming that this is just the overhead of starting the language engine that will process the map task. (I'm in R, which may have more startup overhead.) If you would be performing multiple operations, then mapPartitions seems to be quite a bit faster -- I'm assuming this is because it reads the RDD only once. Even if the RDD is cached in RAM, that saves a lot of overhead from type conversion.
  • NightWolf
    NightWolf over 8 years
    map basically takes your function f, and passes it into iter.map(f). So basically its a convenience method that wraps mapPartitions. I'd be surprised if there was a performance advantage either way for a pure map style transformation job (i.e. where the function is identical), if you need to create some objects for processing, if these objects can be shared then mapPartitions would be advantageous.
  • johannesv
    johannesv about 8 years
    Using mapPartitions seems to make sense performance wise, if you have a reduce operation, that can be split up in a (1) 'reduce per partition' operation and a (2) 'reduce partition results' operation. The mapPartition can return an Iterable with 'number of partitions' elements, that can then be reduced to the final result.
  • Nicholas White
    Nicholas White over 7 years
    I know that you can use map or mapPartitions to achieve the same result (see the two examples in the question); this question is about why you'd choose one way over the other. The comments in the other answer are really useful! Also, you didn't mention that map and flatMap pass false to preservesPartitioning, and what the implications of that are.
  • Semicolons and Duct Tape
    Semicolons and Duct Tape over 7 years
    the function executed everytime versus function execute once for the parition was the link I was missing. Having access to more than one data record at a time with mapPartition is an invaluable thing. appreciate the answer
  • ruhong
    ruhong about 7 years
    Is there a scenario where map is better than mapPartitions? If mapPartitions is so good, why isn't it the default map implementation?
  • Ram Ghadiyaram
    Ram Ghadiyaram about 7 years
    @oneleggedmule: both are for different requirements we have to use wisely if you are instantiating resources like db connections (like shown in the above example) which are costly then mappartitions is right approach since one connection per partition. also saveAsTextFile internally used mappartitions see
  • Raymond Chen
    Raymond Chen about 7 years
    @oneleggedmule From my point of view, map() is easier to understand and learn, and it is also a common method of many different languages. It may be easier to use as well than mapPartitions() if someone is not familiar with this Spark specific method at the beginning. If there is no performance difference then I prefer to use map().
  • ilcord
    ilcord almost 7 years
    regarding 2 - if you're performing iterator-to-iterator transformations, and not materializing the iterator to a collection of some sort, you won't have to hold the entire partition in memory, in fact, that way spark will be able to spill parts of the partition to disk.
  • KrazyGautam
    KrazyGautam almost 7 years
    You dont have to hold the entire partition in memory, but the result. You cannot return the result until you have processed the whole partition
  • thebluephantom
    thebluephantom almost 7 years
    So if the results are not entirely correct, then the use of mapPartitions is less handy. That said the number of errors is low and it is trade off. I am wondering why the architecture is not standardly mapPartitions based.
  • Ziu
    Ziu over 4 years
    @RamGhadiyaram Can you explain more detailed about "why tuples (3,4) and (6,7) are missing?"