Reading multiple files from S3 in parallel (Spark, Java)
Solution 1
the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).
The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.
After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to
val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)
And as requested, here's the java code used.
String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey()));
// repeat while objectListing truncated
JavaRDD<String> events = sc.textFile(String.join(",", keys))
Note that I switched s3n to s3a, because, provided you have the hadoop-aws
and amazon-sdk
JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.
Solution 2
You may use sc.textFile
to read multiple files.
You can pass multiple file url
with as its argument.
You can specify whole directories
, use wildcards
and even CSV of directories and wildcards.
Ex:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
Solution 3
I guess if you try to parallelize while reading aws will be utilizing executor and definitely improve the performance
val bucketName=xxx
val keyname=xxx
val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList)
.flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }
Related videos on Youtube
Nira
Updated on July 07, 2022Comments
-
Nira almost 2 years
I saw a few discussions on this but couldn't quite understand the right solution: I want to load a couple hundred files from S3 into an RDD. Here is how I'm doing it now:
ObjectListing objectListing = s3.listObjects(new ListObjectsRequest(). withBucketName(...). withPrefix(...)); List<String> keys = new LinkedList<>(); objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated() JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
The
ReadFromS3Function
does the actual reading using theAmazonS3
client:public Iterator<String> call(String s) throws Exception { AmazonS3 s3Client = getAmazonS3Client(properties); S3Object object = s3Client.getObject(new GetObjectRequest(...)); InputStream is = object.getObjectContent(); List<String> lines = new LinkedList<>(); String str; try { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); if (is != null) { while ((str = reader.readLine()) != null) { lines.add(str); } } else { ... } } finally { ... } return lines.iterator();
I kind of "translated" this from answers I saw for the same question in Scala. I think it's also possible to pass the entire list of paths to
sc.textFile(...)
, but I'm not sure which is the best-practice way. -
Nira over 7 yearsThe thing about using wildcards is that it results in multiple calls to list(), which causes the spark-job to appear to be unresponsive for a long time. That's why it's recommended to first get all the keys and then parallelize them. See here: tech.kinja.com/…
-
Nira over 7 yearsThanks for the answer Steve, I'm trying to run the code on AWS EMR with both options (with my custom map function and by passing all the paths to
textFile(...)
but am having some trouble getting it to run properly. Once I manage to run it and compare performance I will update this thread. -
Nira over 7 yearsLovely! I tried it with 7 files of 1GiB each and it works very well using
textFile(...)
(50% faster than with my custom code). So can you please update your reply with the corresponding Java code and I will accept it?String prefix = "s3n://" + properties.get("s3.source.bucket") + "/"; objectListing.getObjectSummaries().forEach(summery -> keys.add(prefix+summery.getKey())); // repeat while truncated
JavaRDD<String> events = sc.textFile(String.join(",", keys));
-
Nira over 7 yearsTried with s3a, works fine for me. Requires
--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2
as param to spark-submit, and I think alsosc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
in the code. And EMR should have IAM role with permissions to read/write from/to the bucket. That way you can just doAmazonS3 s3 = new AmazonS3Client();
and creds will be picked up automatically. -
TriCore over 6 yearsthis looks good. listObjects typically returns the results in batches, so I am assuming there would a loop over this.
-
TriCore over 6 yearsThis won't work if there are large number of keys, say billions
-
samthebest about 3 yearsYeah this code needs more logic to handle listings that are truncated.