Spark lists all leaf node even in partitioned data

13,709

Solution 1

As soon as spark is given a directory to read from it issues call to listLeafFiles (org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala). This in turn calls fs.listStatus which makes an api call to get list of files and directories. Now for each directory this method is called again. This hapens recursively until no directories are left. This by design works good in a HDFS system. But works bad in s3 since list file is an RPC call. S3 on other had supports get all files by prefix, which is exactly what we need.

So for example if we had above directory structure with 1 year worth of data with each directory for hour and 10 sub directory we would have , 365 * 24 * 10 = 87k api calls, this can be reduced to 138 api calls given that there are only 137000 files. Each s3 api calls return 1000 files.

Code: org/apache/hadoop/fs/s3a/S3AFileSystem.java

public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
            IOException {
        String key = pathToKey(f);
        if (LOG.isDebugEnabled()) {
            LOG.debug("List status for path: " + f);
        }

        final List<FileStatus> result = new ArrayList<FileStatus>();
        final FileStatus fileStatus =  getFileStatus(f);

        if (fileStatus.isDirectory()) {
            if (!key.isEmpty()) {
                key = key + "/";
            }

            ListObjectsRequest request = new ListObjectsRequest();
            request.setBucketName(bucket);
            request.setPrefix(key);
            request.setMaxKeys(maxKeys);

            if (LOG.isDebugEnabled()) {
                LOG.debug("listStatus: doing listObjects for directory " + key);
            }

            ObjectListing objects = s3.listObjects(request);
            statistics.incrementReadOps(1);

            while (true) {
                for (S3ObjectSummary summary : objects.getObjectSummaries()) {
                    Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
                    // Skip over keys that are ourselves and old S3N _$folder$ files
                    if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ignoring: " + keyPath);
                        }
                        continue;
                    }

                    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
                        result.add(new S3AFileStatus(true, true, keyPath));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fd: " + keyPath);
                        }
                    } else {
                        result.add(new S3AFileStatus(summary.getSize(),
                                dateToLong(summary.getLastModified()), keyPath,
                                getDefaultBlockSize(f.makeQualified(uri, workingDir))));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fi: " + keyPath);
                        }
                    }
                }

                for (String prefix : objects.getCommonPrefixes()) {
                    Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
                    if (keyPath.equals(f)) {
                        continue;
                    }
                    result.add(new S3AFileStatus(true, false, keyPath));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding: rd: " + keyPath);
                    }
                }

                if (objects.isTruncated()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("listStatus: list truncated - getting next batch");
                    }

                    objects = s3.listNextBatchOfObjects(objects);
                    statistics.incrementReadOps(1);
                } else {
                    break;
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding: rd (not a dir): " + f);
            }
            result.add(fileStatus);
        }

        return result.toArray(new FileStatus[result.size()]);
    }

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    }
    else {
      val statuses = {
        val stats = if(fs.isInstanceOf[S3AFileSystem]){
          logWarning("Using Monkey patched version of list status")
          println("Using Monkey patched version of list status")
          val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
          a
//          Array.empty[FileStatus]
        }
        else{
          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))

        }
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }

Solution 2

To clarify Gaurav's answer, that code snipped is from Hadoop branch-2, Probably not going to surface until Hadoop 2.9 (see HADOOP-13208); and someone needs to update Spark to use that feature (which won't harm code using HDFS, just won't show any speedup there).

One thing to consider is: what makes a good file layout for Object Stores.

  • Don't have deep directory trees with only a few files per directory
  • Do have shallow trees with many files
  • Consider using the first few characters of a file for the most changing value (such as day/hour), rather than the last. Why? Some object stores appear to use the leading characters for their hashing, not the trailing ones ... if you give your names more uniqueness then they get spread out over more servers, with better bandwidth/less risk of throttling.
  • If you are using the Hadoop 2.7 libraries, switch to s3a:// over s3n://. It's already faster, and getting better every week, at least in the ASF source tree.

Finally, Apache Hadoop, Apache Spark and related projects are all open source. Contributions are welcome. That's not just the code, it's documentation, testing, and, for this performance stuff, testing against your actual datasets. Even giving us details about what causes problems (and your dataset layouts) is interesting.

Share:
13,709

Related videos on Youtube

Gaurav Shah
Author by

Gaurav Shah

working @ Poshmark Have worked on Ruby on Rails, mongo, ETL. Currently learning spark/big data Linkedin Facebook

Updated on June 07, 2022

Comments

  • Gaurav Shah
    Gaurav Shah almost 2 years

    I have parquet data partitioned by date & hour, folder structure:

    events_v3
      -- event_date=2015-01-01
        -- event_hour=2015-01-1
          -- part10000.parquet.gz
      -- event_date=2015-01-02
        -- event_hour=5
          -- part10000.parquet.gz
    

    I have created a table raw_events via spark but when I try to query, it scans all the directories for footer and that slows down the initial query, even if I am querying only one day worth of data.

    query: select * from raw_events where event_date='2016-01-01'

    similar problem : http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E ( but its old)

    Log:

    App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/
    

    and then it spawns 350 tasks since there are 350 days worth of data.

    I have disabled schemaMerge, and have also specified the schema to read as, so it can just go to the partition that I am looking at, why should it print all the leaf files ? Listing leaf files with 2 executors take 10 minutes, and the query actual execution takes on 20 seconds

    code sample:

    val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
    val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
        df.createOrReplaceTempView("temp_events")
        sparkSession.sql(
          """
            |select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
          """.stripMargin).show()
    
    • zero323
      zero323 over 7 years
    • Gaurav Shah
      Gaurav Shah over 7 years
      I am not using hive at all. Just spark and spark sql
    • Gaurav Shah
      Gaurav Shah over 7 years
      @lostinoverflow I still did not find why do we read recursively, but I am able to bring down 10 mins initial scan to 1 min scan. Effectively reducing the query to less than 2 mins
    • Igor Berman
      Igor Berman over 7 years
      Btw What spark version are u using? There are few fixed bugs in spark 2.0 that fix predicate pushdowns, might be related
    • Gaurav Shah
      Gaurav Shah over 7 years
      using spark 2.0.0
    • Admin
      Admin over 7 years
      Can you provide some details how you did that?
    • Gaurav Shah
      Gaurav Shah over 7 years
      @LostInOverflow spark creates a catalog of path when we try to query it which internally lists all folder recursively. It makes call first to get list of folders , then for each folder again make query, and on recursively. This process is very slow in s3. I moved spark recursive call to s3 filesystem. Where I can ask s3 to give all files with prefix "events_v3/" effectively getting all files recursively. It reduces 48,000 api calls to 300 api calls in my case.
    • Admin
      Admin over 7 years
      @GauravShah Could you post it as answer. I'd like to award a bounty if there won't be a better solution.
    • Igor Berman
      Igor Berman over 7 years
      @GauravShah, can you try spark 2.0.1 ? there are few parquet jiras that were fixed there...
  • Gaurav Shah
    Gaurav Shah over 7 years
    they have backported this fix into 2.8.0 which should be out in a couple of weeks :)
  • stevel
    stevel over 7 years
    Don't know about timetables; nobody has started that release process yet. I do believe it's shipping in HDP-2.5, and as I will get the support calls if its not working, will get to make the support calls. As and when the 2.8 RC process begins, testing will help. Spark doesn't pick up any speedup anyway, as it needs to be tweaked too, and there are other things to look at. Make your life easier by laying out data in fewer directories, such as by month, not day