ImportError: No module named numpy on spark workers

47,507

Solution 1

To use Spark in Yarn client mode, you'll need to install any dependencies to the machines on which Yarn starts the executors. That's the only surefire way to make this work.

Using Spark with Yarn cluster mode is a different story. You can distribute python dependencies with spark-submit.

spark-submit --master yarn-cluster my_script.py --py-files my_dependency.zip

However, the situation with numpy is complicated by the same thing that makes it so fast: the fact that does the heavy lifting in C. Because of the way that it is installed, you won't be able to distribute numpy in this fashion.

Solution 2

numpy is not installed on the worker (virtual) machines. If you use anaconda, it's very convenient to upload such python dependencies when deploying the application in cluster mode. (So there is no need to install numpy or other modules on each machine, instead they must in your anaconda). Firstly, zip your anaconda and put the zip file to the cluster, and then you can submit a job using following script.

 spark-submit \
 --master yarn \
 --deploy-mode cluster \
 --archives hdfs://host/path/to/anaconda.zip#python-env
 --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=pthon-env/anaconda/bin/python 
 app_main.py

Yarn will copy anaconda.zip from the hdfs path to each worker, and use that pthon-env/anaconda/bin/python to execute tasks.

Refer to Running PySpark with Virtualenv may provide more information.

Solution 3

For me environment variable PYSPARK_PYTHON was not set so I set up /etc/environment file and added python environment path to the variable.

PYSPARK_PYTHON=/home/venv/python3

Afterwards, no such error.

Solution 4

What solved it for me (On mac) was actually this guide (Which also explains how to run python through Jupyter Notebooks - https://medium.com/@yajieli/installing-spark-pyspark-on-mac-and-fix-of-some-common-errors-355a9050f735

In a nutshell: (Assuming you installed spark with brew install spark)

  1. Find the SPARK_PATH using - brew info apache-spark
  2. Add those lines to your ~/.bash_profile
# Spark and Python
######
export SPARK_PATH=/usr/local/Cellar/apache-spark/2.4.1
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
#For python 3, You have to add the line below or you will get an error
export PYSPARK_PYTHON=python3
alias snotebook='$SPARK_PATH/bin/pyspark --master local[2]'
######
  1. You should be able to open Jupyter Notebook simply by calling: pyspark

And just remember you don't need to set the Spark Context but instead simply call:

sc = SparkContext.getOrCreate()
Share:
47,507
ajkl
Author by

ajkl

Updated on July 05, 2022

Comments

  • ajkl
    ajkl almost 2 years

    Launching pyspark in client mode. bin/pyspark --master yarn-client --num-executors 60 The import numpy on the shell goes fine but it fails in the kmeans. Somehow the executors do not have numpy installed is my feeling. I didnt find any good solution anywhere to let workers know about numpy. I tried setting PYSPARK_PYTHON but that didnt work either.

    import numpy
    features = numpy.load(open("combined_features.npz"))
    features = features['arr_0']
    features.shape
    features_rdd = sc.parallelize(features, 5000)
    from pyspark.mllib.clustering import KMeans, KMeansModel
    
    from numpy import array
    from math import sqrt
    clusters = KMeans.train(features_rdd, 2, maxIterations=10, runs=10, initializationMode="random")
    

    Stack trace

     org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/hadoop/3/scratch/local/usercache/ajkale/appcache/application_1451301880705_525011/container_1451301880705_525011_01_000011/pyspark.zip/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/hadoop/3/scratch/local/usercache/ajkale/appcache/application_1451301880705_525011/container_1451301880705_525011_01_000011/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/hadoop/3/scratch/local/usercache/ajkale/appcache/application_1451301880705_525011/container_1451301880705_525011_01_000011/pyspark.zip/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
      File "/hadoop/3/scratch/local/usercache/ajkale/appcache/application_1451301880705_525011/container_1451301880705_525011_01_000011/pyspark.zip/pyspark/mllib/__init__.py", line 25, in <module>
    
    ImportError: No module named numpy
    
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
            at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
            at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
            at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
            at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
            at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:88)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
            enter code here