Faster Way to Implement Gaussian Smoothing? (Python 3.10, NumPy)

I’m attempting to implement a Gaussian smoothing/flattening function in my Python 3.10 script to flatten a set of XY-points. For each data point, I’m creating a Y buffer and a Gaussian kernel, which I use to flatten each one of the Y-points based on it’s neighbours.

Here are some sources on the Gaussian-smoothing method:

I’m using the NumPy module for my data arrays, and MatPlotLib to plot the data.

I wrote a minimal reproducible example, with some randomly-generated data, and each one of the arguments needed for the Gaussian function listed at the top of the main function:

import numpy as np
import matplotlib.pyplot as plt
import time

def main():
    dataSize = 1000
    yDataRange = [-4, 4]
    reachPercentage = 0.1
    sigma = 10
    phi = 0
    amplitude = 1

    testXData = np.arange(stop = dataSize)
    testYData = np.random.uniform(low = yDataRange[0], high = yDataRange[1], size = dataSize)

    print("Flattening...")
    startTime = time.time()

    flattenedYData = GaussianFlattenData(testXData, testYData, reachPercentage, sigma, phi, amplitude)

    totalTime = round(time.time() - startTime, 2)
    print("Flattened! (" + str(totalTime) + " sec)")

    plt.title(str(totalTime) + " sec")
    plt.plot(testXData, testYData, label = "Original Data")
    plt.plot(testXData, flattenedYData, label = "Flattened Data")
    plt.legend()

    plt.show()

    plt.close()

def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
    flattenedYData = np.empty(shape = len(xData), dtype = float)

    # For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
    for i in range(len(xData)):
        gaussianCenter = xData[i]
        baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
        reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)), 
                            FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
        currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]

        # Creating Y buffer and Gaussian kernel...
        currYPoints = np.empty(shape = currDataScanNum, dtype = float)
        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            currYPoints[j] = yData[j + reachEdgeIndices[1]]
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

        # Dividing kernel by its sum...
        kernelSum = np.sum(kernel)

        for j in range(len(kernel)):
            kernel[j] = (kernel[j] / kernelSum)

        # Acquiring the current flattened Y point...
        for j in range(len(currYPoints)):
            currYPoints[j] = currYPoints[j] * kernel[j]

        flattenedYData[i] = np.sum(currYPoints)

    return flattenedYData

def GetGaussianValueX(y, mu, sigma, phi, amplitude):
    x = ((sigma * np.sqrt(-2 * np.log(y / (amplitude * np.cos(phi))))) + mu)

    return [x, (mu - (x - mu))]

def GetGaussianValueY(x, mu, sigma, phi, amplitude):
    y = ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))

    return y

def GetClosestNum(base, nums):
    closestIdx = 0
    closestDiff = np.abs(base - nums[0])
    idx = 1

    while (idx < len(nums)):
        currDiff = np.abs(base - nums[idx])

        if (currDiff < closestDiff):
            closestDiff = currDiff
            closestIdx = idx

        idx += 1

    return nums[closestIdx]

def FindInArray(arr, value):
    for i in range(len(arr)):
        if (arr[i] == value):
            return i

    return -1

if (__name__ == "__main__"):
    main()

In the example above, I generate 1,000 random data points, between the ranges of -4 and 4. The reachPercentage variable is the percentage of the Gaussian amplitude above which the Gaussian values will be inserted into the kernel. The sigma, phi and amplitude variables are all inputs to the Gaussian function which will actually generate the Gaussians for each Y-data point to be smoothened.

I wrote some additional utility functions which I needed as well.

The script above works to smoothen the generated data, and I get the following plot:

Blue being the original data, and Orange being the flattened data.

However, it takes a surprisingly long amount of time to smoothen even smaller amounts of data. In the example above I generated 1,000 data points, and it takes ~7.5 seconds to flatten that. With datasets exceeding 10,000 in number, it can easily take over 10 minutes.

Since this is a very popular and known way of smoothening data, I was wondering why this script ran so slow. I originally had this implemented with standard Pythons Lists with calling append, however it was extremely slow. I hoped that using the NumPy arrays instead without calling the append function would make it faster, but that is not really the case.

Is there a way to speed up this process? Is there a Gaussian-smoothing function that already exists out there, that takes in the same arguments, and that could do the job faster?

Thanks for reading my post, any guidance is appreciated.

It looks like you’re using numpy arrays, but none of the array arithmetic that numpy was written for.

For example, you’re doing this:

# Dividing kernel by its sum...
kernelSum = np.sum(kernel)

for j in range(len(kernel)):
    kernel[j] = (kernel[j] / kernelSum)

when you could be doing this:

# Dividing kernel by its sum...
kernel /= np.sum(kernel)

This divides all of the values in the array by the sum.

2 Likes

Thank you for your reply;

That makes a lot of sense, and it did improve the speed of my code slightly. Thanks for pointing that out.

To get a better sense of which parts of my code take up the most time, I subdivided my code into 4 sections, and timed each section to see which ones take up the largest amount of time.

To my surprise, the parts of the code that took up over 90% of the time, were actually the first 2 parts of the for loop:

        gaussianCenter = xData[i]
        baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
        reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)), 
                            FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
        currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]

        # Creating Y buffer and Gaussian kernel...
        currYPoints = np.empty(shape = currDataScanNum, dtype = float)
        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            currYPoints[j] = yData[j + reachEdgeIndices[1]]
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

Specifically, the first part where I calculate the reach edges was taking up over 80% of the time.

I managed to find a faster alternative for the GetClosestNum function from this StackOverflow post:

def GetClosestNum(base, nums):
    nums = np.asarray(nums)

    return nums[(np.abs(nums - base)).argmin()]

And I also found a faster alternative for the FindInArray function, so the first part of the slower snippet I changed to:

        gaussianCenter = xData[i]
        baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
        reachEdgeIndices = [np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[0]), xData))[0][0], 
                            np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))[0][0]]
        currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]

While the code is now MUCH faster (now it takes ~0.8 seconds instead of ~7.5 to smoothen the 1,000 data points in the example), the following section still remains that could be improved slightly:

        # Creating Y buffer and Gaussian kernel...
        currYPoints = np.empty(shape = currDataScanNum, dtype = float)
        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            currYPoints[j] = yData[j + reachEdgeIndices[1]]
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

I’m not quite sure if there’s a way to make that faster as well. Are there any arithmetic NumPy expressions that can help me for the snippet above?

My data (from a Geiger counter) look similar, and are easily smoothed to similar outcome by a simple convolution, using the numpy function:

np.convolve(data,  np.ones((N,))/N,  mode='same')

The yellow double line in the middle is the moving average. The data are 200 000 random values drawn from a Poisson distribution. Smoothing including plotting takes 212.9 milliseconds.

I am using a rectangle of “np.ones” to smooth. You could instead use a Gaussian function, if that is essential.

Maybe try scipy.ndimage.gaussian_filter1d — SciPy v1.11.4 Manual?

Well, the 2 calculations in the body of the loop seem to be independent, so the first step would be to separate them:

        currYPoints = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            currYPoints[j] = yData[j + reachEdgeIndices[1]]

        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

From this it appears that currYPoints is just a slice of yData giving:

        currYPoints = yData[reachEdgeIndices[1] : reachEdgeIndices[1] + currDataScanNum]

        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

I’m not sure of the element type of yData, so you might need to add a type conversion.

The second loop as it’s currently written calls GetGaussianValueY for each value of j, and it’s doing:

    ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))

(x is the argument j).

Replace range(currDataScanNum) with np.arange(currDataScanNum) to make a numpy array of the indexes and then use numpy’s array arithmetic:

        kernel = GetGaussianValueY(np.arange(currDataScanNum), (i - reachEdgeIndices[1]), sigma, phi, amplitude)
1 Like

Thank for your replies;

Thank you Matthew, that compression of the for loop definitely did it. It is now much faster, I had no idea you could iterate over an entire array in a single expression like you showed. That definitely does it for me.

The final code for the GaussianFlattenData which I have is as follows:

def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
    flattenedYData = np.empty(shape = len(xData), dtype = float)

    # For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
    for i in range(len(xData)):
        gaussianCenter = xData[i]
        baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
        reachEdgeIndices = [np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[0]), xData))[0][0], 
                            np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))[0][0]]
        currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]

        # Creating Y buffer and Gaussian kernel...
        currYPoints = yData[reachEdgeIndices[1] : reachEdgeIndices[1] + currDataScanNum]
        kernel = GetGaussianValueY(np.arange(currDataScanNum), (i - reachEdgeIndices[1]), sigma, phi, amplitude)

        # Acquiring the current flattened Y point...
        flattenedYData[i] = np.sum(currYPoints * (kernel / np.sum(kernel)))

    return flattenedYData

Thanks all!