Move file from one folder to another on HDFS in Scala / Spark
Solution 1
Try the following Scala code.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val srcPath = new Path(srcFilePath)
val destPath = new Path(destFilePath)
hdfs.copyFromLocalFile(srcPath, destPath)
You should also check if Spark has the HADOOP_CONF_DIR variable set in the conf/spark-env.sh file. This will make sure that Spark is going to find the Hadoop configuration settings.
The dependencies for the build.sbt file:
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
OR
you can used IOUtils from apache commons to copy data from InputStream to OutputStream
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.IOUtils;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
//Create output stream to HDFS file
val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))
//Create input stream from local file
val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))
IOUtils.copy(inStream, outFileStream)
//Close both files
inStream.close()
outFileStream.close()
Solution 2
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, FileUtil, Path}
val srcFileSystem: FileSystem = FileSystemUtil
.apply(spark.sparkContext.hadoopConfiguration)
.getFileSystem(sourceFile)
val dstFileSystem: FileSystem = FileSystemUtil
.apply(spark.sparkContext.hadoopConfiguration)
.getFileSystem(sourceFile)
FileUtil.copy(
srcFileSystem,
new Path(new URI(sourceFile)),
dstFileSystem,
new Path(new URI(targetFile)),
true,
spark.sparkContext.hadoopConfiguration)
osk
Updated on June 28, 2022Comments
-
osk almost 2 years
I have two paths, one for a file and one for a folder. I would like to move the file into that folder on HDFS. How can I do that in Scala? I'm using Spark, too
Bonus if the same code will work for Windows paths too, just like reading/writing files on HDFS, but not required.
I have tried the following:
val fs = FileSystem.get(sc.hadoopConfiguration) fs.moveFromLocalFile(something, something2)
And I get the following error:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs:/user/o/datasets/data.txt, expected: file:///
Same goes for
moveToLocalFile()
because they are meant to transfer files between filesystems, not within a filesystem. I have also triedfs.rename()
but that did not do anything at all (no error or anything either).I basically create files in one directory (writing to them with a stream) and once they are done they need to moved into a different directory. This different directory is monitored by Spark streaming and I have had some issues when Spark streaming tries to work with not finished files
-
osk over 6 yearsUnfornately the first solution doesn't work, how can I check whether
HADOOP_CONF_DIR
is set? Also the second solution is not viable for my system. I basically create files in one directory (writing to them with a stream) and once they are done they need to moved into a different directory. This different directory is monitored by Spark streaming and I have had some issues when Spark streaming tries to work with not finished files. -
OneCricketeer over 6 years@osk Your question has no mention of Spark... And
HADOOP_CONF_DIR
is an environment variable, so search how you look for them for your respective OS, or if you are using Spark, then open the spark-env.sh file, and set it there -
Srinivas Bandaru almost 6 years@Sahil, I am working on the same solution and trying to find a way to copy a large dataset in a distributed manner, as i see IOUtils is a non-hadoop package org.apache.commons.io.IOUtils it may not work in a distributed manner. Could you please confirm IOUtis can work in a distributed file copy. I am trying to copy a file files in a HDFS to another HDFS directory on same cluster