how to calculate running median efficiently

16,337

Solution 1

One approach is below:

def RunningMedian(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(map(np.median,b))
    #return np.array([np.median(c) for c in b])  # This also works

I found a much faster one (tens of thousand times faster), copied as below:

from collections import deque
from bisect import insort, bisect_left
from itertools import islice
def running_median_insort(seq, window_size):
    """Contributed by Peter Otten"""
    seq = iter(seq)
    d = deque()
    s = []
    result = []
    for item in islice(seq, window_size):
        d.append(item)
        insort(s, item)
        result.append(s[len(d)//2])
    m = window_size // 2
    for item in seq:
        old = d.popleft()
        d.append(item)
        del s[bisect_left(s, old)]
        insort(s, item)
        result.append(s[m])
    return result

Take a look at the link: running_median

Solution 2

You can use two heaps to maintain the lower and higher halves of the data sample as you process it. The algorithm goes like this: for each value, put it to an appropriate heap and 'rebalance' the heaps so that their size is not different by more than 1. Then, to get the median, just pull out the first element from the bigger heap, or take an average of the first elements of the two heaps if they are of equal size. This solution has O(n log(n)) time complexity.

from heapq import heappush, heappop


class RunningMedian:
    def __init__(self):
        self.lowers, self.highers = [], []

    def add_number(self, number):
        if not self.highers or number > self.highers[0]:
            heappush(self.highers, number)
        else:
            heappush(self.lowers, -number)  # for lowers we need a max heap
        self.rebalance()

    def rebalance(self):
        if len(self.lowers) - len(self.highers) > 1:
            heappush(self.highers, -heappop(self.lowers))
        elif len(self.highers) - len(self.lowers) > 1:
            heappush(self.lowers, -heappop(self.highers))

    def get_median(self):
        if len(self.lowers) == len(self.highers):
            return (-self.lowers[0] + self.highers[0])/2
        elif len(self.lowers) > len(self.highers):
            return -self.lowers[0]
        else:
            return self.highers[0]

Demo:

>>> running_median = RunningMedian()
>>> for n in (12, 4, 5, 3, 8, 7):
...     running_median.add_number(n)
...     print(running_median.get_median())
... 
12
8.0
5
4.5
5
6.0
Share:
16,337
Huanian Zhang
Author by

Huanian Zhang

Updated on June 05, 2022

Comments

  • Huanian Zhang
    Huanian Zhang almost 2 years

    I borrowed some code trying to implement a function to calculate the running median for a ton of data. The current one is too slow for me (The tricky part is that I need to exclude all zeros from the running box). Below is the code:

    from itertools import islice
    from collections import deque
    from bisect import bisect_left,insort
    
    def median(s):
        sp = [nz for nz in s if nz!=0]
        print sp
        Mnow = len(sp)
        if Mnow == 0:
            return 0
        else:
            return np.median(sp)
    
    def RunningMedian(seq, M):
        seq = iter(seq)
        s = []
    
        # Set up list s (to be sorted) and load deque with first window of seq
        s = [item for item in islice(seq,M)]
        d = deque(s)
    
        # Sort it in increasing order and extract the median ("center" of the sorted window)
        s.sort()
        medians = [median(s)]
        for item in seq:
            old = d.popleft()          # pop oldest from left
            d.append(item)             # push newest in from right
            del s[bisect_left(s, old)] # locate insertion point and then remove old 
            insort(s, item)            # insert newest such that new sort is not required        
            medians.append(median(s))
        return medians
    

    It works well, the only drawback is that it is too slow. Any one could help me to improve the code to be more efficient?

    After I explored all the possibilities, the following simple code can calculate comparably efficiently:

    def RunningMedian(x,N):
        idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
        b = [row[row>0] for row in x[idx]]
        return np.array(map(np.median,b))
        #return np.array([np.median(c) for c in b])  # This also works