How to use Dynamic Time warping with kNN in python

10,806

Solution 1

You can use a custom metric for KNN. Therefore you only need to implement DTW yourself (or use/adapt any existing DTW implementation in python) [gist of this code].

import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

#toy dataset 
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

#custom metric
def DTW(a, b):   
    an = a.size
    bn = b.size
    pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
    cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
    cumdist[0,0] = 0

    for ai in range(an):
        for bi in range(bn):
            minimum_cost = np.min([cumdist[ai, bi+1],
                                   cumdist[ai+1, bi],
                                   cumdist[ai, bi]])
            cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost

    return cumdist[an, bn]

#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)



#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

Which yields

Fitting 3 folds for each of 3 candidates, totalling 9 fits        

[Parallel(n_jobs=1)]: Done   9 out of   9 | elapsed:   29.0s finished

                         precision    recall  f1-score   support

                      0       0.57      0.89      0.70        18
                      1       0.60      0.20      0.30        15

            avg / total       0.58      0.58      0.52        33

Solution 2

Use dtaidistance. This is the simplified pipeline of what I'm using in order to find the best fit for all windows with lengths between 1 and 20:

from dtaidistance import dtw
from sklearn.metrics import f1_score

def knn(trainX,trainY,testX,testY,w):
    predictions = np.zeros(len(testX))

    for testSampleIndex,testSample in enumerate(testX):
        minimumDistance = float('inf')
        for trainingSampleIndex, trainingSample in enumerate(trainX):
            distanceBetweenTestAndTrainingAScan = dtw.distance(testSample,trainingSample,use_c=True,window=w,max_dist=minimumDistance)
            if (distanceBetweenTestAndTrainingAScan < minimumDistance):
                minimumDistance = distanceBetweenTestAndTrainingAScan
                predictions[testSampleIndex] = trainY[trainingSampleIndex]

    return [testY,predictions]

def DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID):
    testDataSetBestF1Score = -float("inf")
    testDataSetBestPredictions = []
    for w in range(1,21):
        [testY,predictions] = knn(trainX,trainY,testX,testY,w)

        microF1Score = f1_score(testY, predictions, average='micro')
        if (microF1Score > testDataSetBestF1Score):
            testDataSetBestF1Score = microF1Score
            testDataSetBestPredictions = predictions
    return testDataSetBestPredictions

def runDTW(database):
    for testDataSetID in database:
        [testX,testY,trainX,trainY,patientIDsForTraining] = createTestingAndTrainingSets(database,testDataSetID)
        testDataSetBestPredictions = DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID)
Share:
10,806

Related videos on Youtube

EmJ
Author by

EmJ

Updated on October 25, 2022

Comments

  • EmJ
    EmJ over 1 year

    I have a time-series dataset with two lables (0 and 1). I am using Dynamic Time Warping (DTW) as a similarity measure for classification using k-nearest neighbour (kNN) as described in these two wonderful blog posts:

    • https://nbviewer.jupyter.org/github/markdregan/K-Nearest-Neighbors-with-Dynamic-Time-Warping/blob/master/K_Nearest_Neighbor_Dynamic_Time_Warping.ipynb
    • http://alexminnaar.com/2014/04/16/Time-Series-Classification-and-Clustering-with-Python.html

      Arguments
      ---------
      n_neighbors : int, optional (default = 5)
          Number of neighbors to use by default for KNN
      
      max_warping_window : int, optional (default = infinity)
          Maximum warping window allowed by the DTW dynamic
          programming function
      
      subsample_step : int, optional (default = 1)
          Step size for the timeseries array. By setting subsample_step = 2,
          the timeseries length will be reduced by 50% because every second
          item is skipped. Implemented by x[:, ::subsample_step]
      """
      
      def __init__(self, n_neighbors=5, max_warping_window=10000, subsample_step=1):
          self.n_neighbors = n_neighbors
          self.max_warping_window = max_warping_window
          self.subsample_step = subsample_step
      
      def fit(self, x, l):
          """Fit the model using x as training data and l as class labels
      
          Arguments
          ---------
          x : array of shape [n_samples, n_timepoints]
              Training data set for input into KNN classifer
      
          l : array of shape [n_samples]
              Training labels for input into KNN classifier
          """
      
          self.x = x
          self.l = l
      
      def _dtw_distance(self, ts_a, ts_b, d = lambda x,y: abs(x-y)):
          """Returns the DTW similarity distance between two 2-D
          timeseries numpy arrays.
      
          Arguments
          ---------
          ts_a, ts_b : array of shape [n_samples, n_timepoints]
              Two arrays containing n_samples of timeseries data
              whose DTW distance between each sample of A and B
              will be compared
      
          d : DistanceMetric object (default = abs(x-y))
              the distance measure used for A_i - B_j in the
              DTW dynamic programming function
      
          Returns
          -------
          DTW distance between A and B
          """
      
          # Create cost matrix via broadcasting with large int
          ts_a, ts_b = np.array(ts_a), np.array(ts_b)
          M, N = len(ts_a), len(ts_b)
          cost = sys.maxint * np.ones((M, N))
      
          # Initialize the first row and column
          cost[0, 0] = d(ts_a[0], ts_b[0])
          for i in xrange(1, M):
              cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])
      
          for j in xrange(1, N):
              cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])
      
          # Populate rest of cost matrix within window
          for i in xrange(1, M):
              for j in xrange(max(1, i - self.max_warping_window),
                              min(N, i + self.max_warping_window)):
                  choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
                  cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])
      
          # Return DTW distance given window 
          return cost[-1, -1]
      
      def _dist_matrix(self, x, y):
          """Computes the M x N distance matrix between the training
          dataset and testing dataset (y) using the DTW distance measure
      
          Arguments
          ---------
          x : array of shape [n_samples, n_timepoints]
      
          y : array of shape [n_samples, n_timepoints]
      
          Returns
          -------
          Distance matrix between each item of x and y with
              shape [training_n_samples, testing_n_samples]
          """
      
          # Compute the distance matrix        
          dm_count = 0
      
          # Compute condensed distance matrix (upper triangle) of pairwise dtw distances
          # when x and y are the same array
          if(np.array_equal(x, y)):
              x_s = np.shape(x)
              dm = np.zeros((x_s[0] * (x_s[0] - 1)) // 2, dtype=np.double)
      
              p = ProgressBar(shape(dm)[0])
      
              for i in xrange(0, x_s[0] - 1):
                  for j in xrange(i + 1, x_s[0]):
                      dm[dm_count] = self._dtw_distance(x[i, ::self.subsample_step],
                                                        y[j, ::self.subsample_step])
      
                      dm_count += 1
                      p.animate(dm_count)
      
              # Convert to squareform
              dm = squareform(dm)
              return dm
      
          # Compute full distance matrix of dtw distnces between x and y
          else:
              x_s = np.shape(x)
              y_s = np.shape(y)
              dm = np.zeros((x_s[0], y_s[0])) 
              dm_size = x_s[0]*y_s[0]
      
              p = ProgressBar(dm_size)
      
              for i in xrange(0, x_s[0]):
                  for j in xrange(0, y_s[0]):
                      dm[i, j] = self._dtw_distance(x[i, ::self.subsample_step],
                                                    y[j, ::self.subsample_step])
                      # Update progress bar
                      dm_count += 1
                      p.animate(dm_count)
      
              return dm
      
      def predict(self, x):
          """Predict the class labels or probability estimates for 
          the provided data
      
          Arguments
          ---------
            x : array of shape [n_samples, n_timepoints]
                Array containing the testing data set to be classified
      
          Returns
          -------
            2 arrays representing:
                (1) the predicted class labels 
                (2) the knn label count probability
          """
      
          dm = self._dist_matrix(x, self.x)
      
          # Identify the k nearest neighbors
          knn_idx = dm.argsort()[:, :self.n_neighbors]
      
          # Identify k nearest labels
          knn_labels = self.l[knn_idx]
      
          # Model Label
          mode_data = mode(knn_labels, axis=1)
          mode_label = mode_data[0]
          mode_proba = mode_data[1]/self.n_neighbors
      
          return mode_label.ravel(), mode_proba.ravel()
      

    However, for classification with kNN the two posts use their own kNN algorithms.

    I want to use sklearn's options such as gridsearchcv in my classification. Therefore, I would like to know how I can use Dynamic Time Warping (DTW) with sklearn kNN.

    Note: I am not limited to sklearn and happy to receive answers in other libraries as well

    I am happy to provide more details if needed.

  • EmJ
    EmJ almost 4 years
    Hi, I am using dtaidistance as you have suggested in this question. However I got the following issue while using the library: stackoverflow.com/questions/62211066/… please let me know your thoughts, thank you :)
  • user2783767
    user2783767 over 2 years
    How to use this solution for multi feature time series. stackoverflow.com/questions/70227646/…
  • user2783767
    user2783767 over 2 years
    How to use this solution for multi feature time series. stackoverflow.com/questions/70227646
  • Nikolas Rieble
    Nikolas Rieble over 2 years
    @user2783767 In the sample code I provided, I am using docs.scipy.org/doc/scipy/reference/generated/… which already works on multidimensional input. Note: I did not yet try out whether this works with multidimensional data myself, but I do not see why it should not.