How to assign unique contiguous numbers to elements in a Spark RDD

25,407

Solution 1

Starting with Spark 1.0 there are two methods you can use to solve this easily:

  • RDD.zipWithIndex is just like Seq.zipWithIndex, it adds contiguous (Long) numbers. This needs to count the elements in each partition first, so your input will be evaluated twice. Cache your input RDD if you want to use this.
  • RDD.zipWithUniqueId also gives you unique Long IDs, but they are not guaranteed to be contiguous. (They will only be contiguous if each partition has the same number of elements.) The upside is that this does not need to know anything about the input, so it will not cause double-evaluation.

Solution 2

For a similar example use case, I just hashed the string values. See http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

It sounds like you're already doing something like this, although hashing can be easier to manage.

Matei suggested here an approach to emulating zipWithIndex on an RDD, which amounts to assigning IDs within each partiition that are going to be globally unique: https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

Solution 3

Another easy option, if using DataFrames and just concerned about the uniqueness is to use function MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Edit: MonotonicallyIncreasingID was deprecated and removed since Spark 2.0; it is now known as monotonically_increasing_id .

Solution 4

monotonically_increasing_id() appears to be the answer, but unfortunately won't work for ALS since it produces 64-bit numbers and ALS expects 32-bit ones (see my comment below radek1st's answer for deets).

The solution I found is to use zipWithIndex(), as mentioned in Darabos' answer. Here's how to implement it:

If you already have a single-column DataFrame with your distinct users called userids, you can create a lookup table (LUT) as follows:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

Now you can:

  • Use this LUT to get ALS-friendly integer IDs to provide to ALS
  • Use this LUT to do a reverse-lookup when you need to go back from ALS ID to the original ID

Do the same for items, obviously.

Solution 5

People have already recommended monotonically_increasing_id(), and mentioned the problem that it creates Longs, not Ints.

However, in my experience (caveat - Spark 1.6) - if you use it on a single executor (repartition to 1 before), there is no executor prefix used, and the number can be safely cast to Int. Obviously, you need to have less than Integer.MAX_VALUE rows.

Share:
25,407
Dilum Ranatunga
Author by

Dilum Ranatunga

Updated on September 12, 2020

Comments

  • Dilum Ranatunga
    Dilum Ranatunga over 3 years

    I have a dataset of (user, product, review), and want to feed it into mllib's ALS algorithm.

    The algorithm needs users and products to be numbers, while mine are String usernames and String SKUs.

    Right now, I get the distinct users and SKUs, then assign numeric IDs to them outside of Spark.

    I was wondering whether there was a better way of doing this. The one approach I've thought of is to write a custom RDD that essentially enumerates 1 through n, then call zip on the two RDDs.

  • axiom
    axiom about 8 years
    This is a good idea, but one must be cautious about the number of collisions. For applications where arity of the set being encoded (for example, tags, user names etc.) approaches 100k, the number of collisions may be significant.
  • radek1st
    radek1st almost 8 years
    I got collisions after several thousand of records already, so I wouldn't normally recommend it.
  • Sean Owen
    Sean Owen almost 8 years
    In the ALS use case, collisions don't matter that much, but that is true only up to a point. In cases where collisions are important, yes this isn't a good approach.
  • xenocyon
    xenocyon over 7 years
    This approach actually won't work for the purpose of user/item identifiers in ALS, because monotonically_increasing_id() produces 64-bit numbers (i.e. long not int), whereas "the DataFrame-based API for ALS currently only supports integers for user and item ids" (from spark.apache.org/docs/2.0.0/ml-collaborative-filtering.html)
  • radek1st
    radek1st over 7 years
    I agree, for the purpose of ALS the safest is to go with zipWithIndex().
  • Tagar
    Tagar over 6 years
    thanks. so the RDD.zipWithUniqueId doesn't scan dataset twice?
  • Daniel Darabos
    Daniel Darabos over 6 years
  • cadama
    cadama over 5 years
    Is it possible that an executor dies and spark ends up assigning the same id to two distinct entries?
  • Joe S
    Joe S about 5 years
    will this assign different ids to the same string if it is in the RDD twice?
  • Daniel Darabos
    Daniel Darabos about 5 years
    Yes. Each element gets a different number.