How to find the nearest neighbors of 1 Billion records with Spark?

16,300

Solution 1

Performing a brute-force comparison of all records against all records is a losing battle. My suggestion would be to go for a ready-made implementation of k-Nearest Neighbor algorithm such as the one provided by scikit-learn then broadcast the resulting arrays of indices and distances and go further.

Steps in this case would be:

1- vectorize the features as Bryce suggested and let your vectorizing method return a list (or numpy array) of floats with as many elements as your features

2- fit your scikit-learn nn to your data:

nbrs = NearestNeighbors(n_neighbors=10, algorithm='auto').fit(vectorized_data)

3- run the trained algorithm on your vectorized data (training and query data are the same in your case)

distances, indices = nbrs.kneighbors(qpa)

Steps 2 and 3 will run on your pyspark node and are not parallelizable in this case. You will need to have enough memory on this node. In my case with 1.5 Million records and 4 features, it took a second or two.

Until we get a good implementation of NN for spark I guess we would have to stick to these workarounds. If you'd rather like to try something new, then go for http://spark-packages.org/package/saurfang/spark-knn

Solution 2

As it happens, I have a solution to this, involving combining sklearn with Spark: https://adventuresindatascience.wordpress.com/2016/04/02/integrating-spark-with-scikit-learn-visualizing-eigenvectors-and-fun/

The gist of it is:

  • Use sklearn’s k-NN fit() method centrally
  • But then use sklearn’s k-NN kneighbors() method distributedly

Solution 3

You haven't provided a lot of detail, but the general approach I would take to this problem would be to:

  1. Convert the records to a data structure like like a LabeledPoint with (ID, x1..x100) as label and features
  2. Map over each record and compare that record to all the other records (lots of room for optimization here)
  3. Create some cutoff logic so that once you start comparing ID = 5 with ID = 1 you interrupt the computation because you have already compared ID = 1 with ID = 5
  4. Some reduce step to get a data structure like {id_pair: [1,5], distance: 123}
  5. Another map step to find the 10 closest neighbors of each record

You've identified pyspark and I generally do this type of work using scala, but some pseudo code for each step might look like:

# 1. vectorize the features
def vectorize_raw_data(record)
    arr_of_features = record[1..99]
    LabeledPoint( record[0] , arr_of_features)

# 2,3 + 4 map over each record for comparison
broadcast_var = [] 
def calc_distance(record, comparison)
    # here you want to keep a broadcast variable with a list or dictionary of
    # already compared IDs and break if the key pair already exists
    # then, calc the euclidean distance by mapping over the features of
    # the record and subtracting the values then squaring the result, keeping 
    # a running sum of those squares and square rooting that sum
    return {"id_pair" : [1,5], "distance" : 123}    

for record in allRecords:
  for comparison in allRecords:
    broadcast_var.append( calc_distance(record, comparison) )

# 5. map for 10 closest neighbors

def closest_neighbors(record, n=10)
     broadcast_var.filter(x => x.id_pair.include?(record.id) ).takeOrdered(n, distance)

The psuedocode is terrible, but I think it communicates the intent. There will be a lot of shuffling and sorting here as you are comparing all records with all other records. IMHO, you want to store the keypair/distance in a central place (like a broadcast variable that gets updated though this is dangerous) to reduce the total euclidean distance calculations you perform.

Share:
16,300
Osiris
Author by

Osiris

Updated on June 08, 2022

Comments

  • Osiris
    Osiris almost 2 years

    Given 1 Billion records containing following information:

        ID  x1  x2  x3  ... x100
        1   0.1  0.12  1.3  ... -2.00
        2   -1   1.2    2   ... 3
        ...
    

    For each ID above, I want to find the top 10 closest IDs, based on Euclidean distance of their vectors (x1, x2, ..., x100).

    What's the best way to compute this?