Connect to S3 data from PySpark

33,718

Solution 1

I've solved adding --packages org.apache.hadoop:hadoop-aws:2.7.1 into spark-submit command.

It will download all hadoop missing packages that will allow you to execute spark jobs with S3.

Then in your job you need to set your AWS credentials like:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)

Other option about setting your credentials is define them into spark/conf/spark-env:

#!/usr/bin/env bash
AWS_ACCESS_KEY_ID='xxxx'
AWS_SECRET_ACCESS_KEY='xxxx'

SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine
SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g)
SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node

More info:

Solution 2

I would suggest going through this link.

In my case, I used Instance profile credentials to access s3 data.

Instance profile credentials– used on EC2 instances, and delivered through the Amazon EC2 metadata service. The AWS SDK for Java uses the InstanceProfileCredentialsProvider to load these credentials.

Note

Instance profile credentials are used only if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See EC2ContainerCredentialsProviderWrapper for more information.

For pyspark, I use setting to access s3 content.

def get_spark_context(app_name):
    # configure
    conf = pyspark.SparkConf()

    # init & return
    sc = pyspark.SparkContext.getOrCreate(conf=conf)

    # s3a config
    sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
                                      's3.eu-central-1.amazonaws.com')
    sc._jsc.hadoopConfiguration().set(
        'fs.s3a.aws.credentials.provider',
        'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
        'com.amazonaws.auth.profile.ProfileCredentialsProvider'
    )

    return pyspark.SQLContext(sparkContext=sc)

More on spark context here.

Please refer this for type S3 access.

Share:
33,718
Elon Musk
Author by

Elon Musk

Updated on August 21, 2022

Comments

  • Elon Musk
    Elon Musk over 1 year

    I am trying to read a JSON file, from Amazon s3, to create a spark context and use it to process the data.

    Spark is basically in a docker container. So putting files in docker path is also PITA. Hence pushed it to S3.

    The code below explains rest of the stuff.

    from pyspark import SparkContext, SparkConf
    conf = SparkConf().setAppName("first")
    sc = SparkContext(conf=conf)
    
    config_dict = {"fs.s3n.awsAccessKeyId":"**",
                   "fs.s3n.awsSecretAccessKey":"**"}
    
    bucket = "nonamecpp"
    prefix = "dataset.json"
    filename = "s3n://{}/{}".format(bucket, prefix)
    rdd = sc.hadoopFile(filename,
                        'org.apache.hadoop.mapred.TextInputFormat',
                        'org.apache.hadoop.io.Text',
                        'org.apache.hadoop.io.LongWritable',
                        conf=config_dict)
    

    I get the following error -

    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-2-b94543fb0e8e> in <module>()
          9                     'org.apache.hadoop.io.Text',
         10                     'org.apache.hadoop.io.LongWritable',
    ---> 11                     conf=config_dict)
         12 
    
    /usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
        558         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
        559                                               valueClass, keyConverter, valueConverter,
    --> 560                                               jconf, batchSize)
        561         return RDD(jrdd, self)
        562 
    
    /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
        536         answer = self.gateway_client.send_command(command)
        537         return_value = get_return_value(answer, self.gateway_client,
    --> 538                 self.target_id, self.name)
        539 
        540         for temp_arg in temp_args:
    
    /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
        298                 raise Py4JJavaError(
        299                     'An error occurred while calling {0}{1}{2}.\n'.
    --> 300                     format(target_id, '.', name), value)
        301             else:
        302                 raise Py4JError(
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
    : java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
        at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
        at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
        at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
        at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
        at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
        at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
        at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:744)
    

    I have clearly provided aswSecretAccessKey and awsAccessId. Whats going wrong?

  • Viv
    Viv almost 7 years
    How to read parquet files from a folder on s3 ? (pyspark). The above code is not working for me while trying to read parquets
  • Franzi
    Franzi almost 7 years
    Could you show how are you trying to read from parquet files?
  • Viv
    Viv almost 7 years
    sc = SparkContext.getOrCreate() sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'A') sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey‌​", 's') sqlContext = SQLContext(sc) df2 = sqlContext.read.parquet(s3://path/to/folder)
  • Franzi
    Franzi almost 7 years
    As I see, you are reading from a s3 path instead of a s3n. In that case you could try to set your keys with fs.s3.awsAccessKeyId & fs.s3.awsSecretAccessKey
  • stevel
    stevel about 5 years
    FWIW, the S3A connector defaults to having the EC2 IAM credential provider in its list of suppliers (last in the list as its the slowest and can trigger throttling). The standard order is: secrets in URL (bad; removed from latest release), fs.s3a.secret settings in XML or JCEKS files, env vars, IAM roles. Spark-submit will also look for the AWS_ env vars and set the s3n and s3a key values from them.
  • yardstick17
    yardstick17 about 5 years
    @SteveLoughran So far I have been processing 20G data without any issue. But, good addition, to keep an eye on throttling.
  • stevel
    stevel about 5 years
    the throttling is mostly about a bug in the early rleases of the InstanceProfileCredentialsProvider where every instance was unique; it's moved over to a singleton across all threads. Otherwise: every filesystem client in a single JVM was hitting the AWS Auth services, which are throttled like everything else.