Spark - Random Number Generation

30,922

Solution 1

The reason why the same sequence is repeated is that the random generator is created and initialized with a seed before the data is partitioned. Each partition then starts from the same random seed. Maybe not the most efficient way to do it, but the following should work:

val myClass = new MyClass()
val M = 3

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{ 
       val rand = scala.util.Random
       row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}

Solution 2

Just use the SQL function rand:

import org.apache.spark.sql.functions._

//df: org.apache.spark.sql.DataFrame = [key: int]

df.select($"key", rand() as "rand").show
+---+-------------------+
|key|               rand|
+---+-------------------+
|  1| 0.8635073400704648|
|  2| 0.6870153659986652|
|  3|0.18998048357873532|
+---+-------------------+


df.select($"key", rand() as "rand").show
+---+------------------+
|key|              rand|
+---+------------------+
|  1|0.3422484248879837|
|  2|0.2301384925817671|
|  3|0.6959421970071372|
+---+------------------+

Solution 3

According to this post, the best solution is not to put the new scala.util.Random inside the map, nor completely outside (ie. in the driver code), but in an intermediate mapPartitionsWithIndex:

import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
   val rand = new scala.util.Random(indx+myAppSeed)
   iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}

Solution 4

Using Spark Dataset API, perhaps for use in an accumulator:

df.withColumn("_n", substring(rand(),3,4).cast("bigint"))
Share:
30,922
Brian
Author by

Brian

Updated on July 09, 2022

Comments

  • Brian
    Brian almost 2 years

    I have written a method that must consider a random number to simulate a Bernoulli distribution. I am using random.nextDouble to generate a number between 0 and 1 then making my decision based on that value given my probability parameter.

    My problem is that Spark is generating the same random numbers within each iteration of my for loop mapping function. I am using the DataFrame API. My code follows this format:

    val myClass = new MyClass()
    val M = 3
    val myAppSeed = 91234
    val rand = new scala.util.Random(myAppSeed)
    
    for (m <- 1 to M) {
      val newDF = sqlContext.createDataFrame(myDF
        .map{row => RowFactory
          .create(row.getString(0),
            myClass.myMethod(row.getString(2), rand.nextDouble())
        }, myDF.schema)
    }
    

    Here is the class:

    class myClass extends Serializable {
      val q = qProb
    
      def myMethod(s: String, rand: Double) = {
        if (rand <= q) // do something
        else // do something else
      }
    }
    

    I need a new random number every time myMethod is called. I also tried generating the number inside my method with java.util.Random (scala.util.Random v10 does not extend Serializable) like below, but I'm still getting the same numbers within each for loop

    val r = new java.util.Random(s.hashCode.toLong)
    val rand = r.nextDouble()
    

    I've done some research, and it seems this has do to with Sparks deterministic nature.

  • Brian
    Brian about 8 years
    This didn't quite solve my problem, but its an elegant solution that I will likely be using in the future, so +1
  • Brian
    Brian about 8 years
    I modified this slightly to solve my problem. I passed the Random val into my method and generated random numbers from within there. This solved my problem, but I had to use java.util.Random for serializeability reasons.
  • d-xa
    d-xa over 2 years
    Had to maintain a code that used this solution and want to share with the community that this solution has its downsides and probably will badly influence your statistical analysis, please be aware of. When you have an rdd that has partition>1, your rdd sequence of random numbers will start over again for each partition with new seed and different numbers, but it may change the 'characteristics' of the whole sequence. My advice: don't use this approach.
  • leo9r
    leo9r over 2 years
    @d-xa thanks for you comment. Could you recommend an alternative approach?
  • d-xa
    d-xa over 2 years
    if one uses this approach I would suggest to fix the partition for myRDD to 1