Spark Streaming on a S3 Directory

17,597

In order to stream an S3 bucket. you need to provide the path to S3 bucket. And it will stream all data from all the files in this bucket. Then whenever w new file is created in this bucket, it will be streamed. If you are appending data to existing file which are read before, these new updates will not be read.

here is small piece of code that works

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)

val ssc = new org.apache.spark.streaming.StreamingContext(
  sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

hope it will help.

Share:
17,597
Admin
Author by

Admin

Updated on June 17, 2022

Comments

  • Admin
    Admin almost 2 years

    So I have thousands of events being streamed through Amazon Kinesis into SQS then dumped into a S3 directory. About every 10 minutes, a new text file is created to dump the data from Kinesis into S3. I would like to set up Spark Streaming so that it streams the new files being dumped into S3. Right now I have

    import org.apache.spark.streaming._
    val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
    currentFileStream.print
    ssc.start()
    

    However, Spark Streaming is not picking up the new files being dumped into S3. I think it has something to do with the file write requirements:

    The files must have the same data format.
    The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
    Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
    

    Why is Spark streaming not picking up the new files? Is it because AWS is creating the files in the directory and not moving them? How can I make sure Spark picks up the files being dumped into S3?

  • Admin
    Admin almost 9 years
    Hey Hafiz, this does not work for me. I set these configurations but it is still not picking up the new files. I can get the old files with sc.textFile("") but no streamed files.
  • Hafiz Mujadid
    Hafiz Mujadid almost 9 years
    do you get any exception ? what is the log of your applications?
  • Admin
    Admin almost 9 years
    Nope, no errors. It simply does not pick up the new files.
  • lfk
    lfk almost 6 years
    Is it possible to stream multiple buckets?
  • Jijo
    Jijo about 3 years
    thank you this worked for me but i used s3a with minio