How can I read in a binary file from hdfs into a Spark dataframe?
Solution 1
So, for anyone that starts with Spark as me and stumbles upon binary files. Here is how I solved it:
dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
StructField('idx_resource',IntegerType(),False),
StructField('date',IntegerType),False),
StructField('value',DoubleType(),False),
StructField('pollID',IntegerType(),False)])
filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')
def read_array(rdd):
#output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
array=array.newbyteorder().byteswap() # big Endian
return array.tolist()
unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
And now you can do whatever fancy stuff you want in Spark with your dataframe.
Solution 2
Edit: Please review the use of sc.binaryFiles as mentioned here: https://stackoverflow.com/a/28753276/5088142
try using:
hdfs://machine_host_name:8020/user/bin_file1.bin
you the host-name in fs.defaultFS in core-site.xml
Solution 3
Since Spark 3.0, Spark supports binary file data source, which reads binary files and converts each file into a single record that contains the raw content and metadata of the file.
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
WilliamEllisWebb
Updated on June 12, 2022Comments
-
WilliamEllisWebb almost 2 years
I am trying to port some code from pandas to (py)Spark. Unfortunately I am already failing with the input part, where I want to read in binary data and put it in a Spark Dataframe.
So far I am using
fromfile
from numpy:dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')]) data = np.fromfile('binary_file.bin', dtype=dt) data=data[1:] #throw away header df_bin = pd.DataFrame(data, columns=data.dtype.names)
But for Spark I couldn't find how to do it. My workaround so far was to use csv-Files instead of the binary file, but that is not an ideal solution. I am aware that I shouldn't use numpy's
fromfile
with spark. How can I read in a binary file that is already loaded into hdfs?I tried something like
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin]) fileRDD.map(lambda x: ???)
But it is giving me a
No such file or directory
error.I have seen this question: spark in python: creating an rdd by loading binary data with numpy.fromfile but that only works if I have the files stored in the home of the driver node.