PySpark: How to Read Many JSON Files, Multiple Records Per File

11,078

Solution 1

The previous answers are not going to read the files in a distributed fashion (see reference). To do so, you would need to parallelize the s3 keys and then read in the files during a flatMap step like below.

import boto3
import json
from pyspark.sql import Row

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=s3Key)
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    for dicts in content['interactions']
        yield Row(**dicts)

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.flatMap(distributedJsonRead)

Boto3 Reference

Solution 2

What about using DataFrames?

does testFrame = sqlContext.read.json('s3n://<bucket>/<key>') give you what you want from one file?

Does every observation have the same "columns" (# of keys)?

If so you could use boto to list each object you want to add, read them in and union them with each other.

from pyspark.sql import SQLContext
import boto3
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

s3 = boto3.resource('s3')
bucket = s3.Bucket('<bucket>')

aws_secret_access_key = '<secret>'
aws_access_key_id = '<key>'

#Configure spark with your S3 access keys
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)
object_list = [k for k in bucket.objects.all() ]
key_list = [k.key for k in bucket.objects.all()]

paths = ['s3n://'+o.bucket_name+'/'+ o.key for o in object_list ]

dataframes = [sqlContext.read.json(path) for path in paths]

df = dataframes[0]
for idx, frame in enumerate(dataframes):
    df = df.unionAll(frame)

I'm new to spark myself so I'm wondering if there's a better way to use dataframes with a lot of s3 files, but so far this is working for me.

Solution 3

The name is misleading (because it's singular), but sparkContext.textFile() (at least in the Scala case) also accepts a directory name or a wildcard path, so you just be able to say textFile("/my/dir/*.json").

Share:
11,078
Admin
Author by

Admin

Updated on July 06, 2022

Comments

  • Admin
    Admin almost 2 years

    I have a large dataset stored in a S3 bucket, but instead of being a single large file, it's composed of many (113K to be exact) individual JSON files, each of which contains 100-1000 observations. These observations aren't on the highest level, but require some navigation within each JSON to access. i.e. json["interactions"] is a list of dictionaries.

    I'm trying to utilize Spark/PySpark (version 1.1.1) to parse through and reduce this data, but I can't figure out the right way to load it into an RDD, because it's neither all records > one file (in which case I'd use sc.textFile, though added complication here of JSON) nor each record > one file (in which case I'd use sc.wholeTextFiles).

    Is my best option to use sc.wholeTextFiles and then use a map (or in this case flatMap?) to pull the multiple observations from being stored under a single filename key to their own key? Or is there an easier way to do this that I'm missing?

    I've seen answers here that suggest just using json.loads() on all files loaded via sc.textFile, but it doesn't seem like that would work for me because the JSONs aren't simple highest-level lists.