Why is this simple Spark program not utlizing multiple cores?

13,261

Solution 1

Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.

A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

Solution 2

As none of the above really worked for me (maybe because I didn't really understand them), here is my two cents.

I was starting my job with spark-submit program.py and inside the file I had sc = SparkContext("local", "Test"). I tried to verify the number of cores spark sees with sc.defaultParallelism. It turned out that it was 1. When I changed the context initialization to sc = SparkContext("local[*]", "Test") it became 16 (the number of cores of my system) and my program was using all the cores.

I am quite new to spark, but my understanding is that local by default indicates the use of one core and as it is set inside the program, it would overwrite the other settings (for sure in my case it overwrites those from configuration files and environment variables).

Solution 3

To change the CPU core consumption, set the number of cores to be used by the workers in the spark-env.sh file in spark-installation-directory/conf This is done with the SPARK_EXECUTOR_CORES attribute in spark-env.sh file. The value is set to 1 by default.

Solution 4

I tried the method mentioned by @Svend, but still does not work.

The following works for me:

Do NOT use the local url, for example:

sc = SparkContext("local", "Test App").

Use the master URL like this:

sc = SparkContext("spark://your_spark_master_url:port", "Test App")

Share:
13,261
MetallicPriest
Author by

MetallicPriest

Updated on June 05, 2022

Comments

  • MetallicPriest
    MetallicPriest almost 2 years

    So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.

    spark-submit --master local[*] pi.py
    

    And the code of that program is the following.

    #"""pi.py"""
    from pyspark import SparkContext
    import random
    
    N = 12500000
    
    def sample(p):
        x, y = random.random(), random.random()
        return 1 if x*x + y*y < 1 else 0
    
    sc = SparkContext("local", "Test App")
    count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
    

    When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?