Micro-optimization challenge: Count increasing maximums

Input is a tuple of positive integers, for example:

xs = 3, 1, 4, 1, 5, 9, 2, 6

Going through that and looking for larger and larger values, we find these:

3, 4, 5, 9

So the answer is 4, as there are four values larger than all previous.

Challenge: Write a function maxes(xs) for this, so maxes((3, 1, 4, 1, 5, 9, 2, 6)) should return 4. Fastest solution wins, benchmark will be with

xs = tuple(accumulate(
    random.choices(range(-1, 3), k=10**5),
    initial=10**6
))

executed on Attempt This Online! with the below script. The correct result is around 35000.

Side challenge: Write the prettiest solution.

I hope the task is clear. I won’t provide a reference solution because I don’t want to take any solutions away, but I’ll confirm the first correct solution as correct.

Benchmark script
def maxes_1(xs):
    ...

def maxes_2(xs):
    ...

def maxes_3(xs):
    ...

funcs = [
    maxes_1,
    maxes_2,
    maxes_3,
]

from itertools import accumulate
from time import perf_counter as time
from statistics import mean, stdev
import random
import sys

# Correctness
for _ in range(10):
    xs = tuple(accumulate(random.choices(range(-1, 3), k=10**5), initial=10**6))
    expect = funcs[0](xs)
    for f in funcs[1:]:
        result = f(xs)
        if result != expect:
            raise Exception(f'{f.__name__} counted {result} instead of {expect}.')

# Speed
times = {f: [] for f in funcs}
def stats(f):
    ts = [t * 1e3 for t in sorted(times[f])[:5]]
    return f'{mean(ts):5.2f} ± {stdev(ts):4.2f} ms '
for _ in range(100):
    random.shuffle(funcs)
    for f in funcs:
        t = time()
        f(xs)
        times[f].append(time() - t)
for f in sorted(funcs, key=stats):
    print(stats(f), f.__name__)

print('\nPython:', sys.version)
4 Likes

Would it be permissible to use numpy?

Hmm, I hadn’t thought of that. Yes, I’ll allow it. And the ATO site does have NumPy.

This probably isn’t very different than your reference solution:

def maxes(xs):
    n = 1
    x_max = xs[0]
    for x in xs[1:]:
        if x > x_max:
            n += 1
            x_max = x
    return n
1 Like

Yes, that’s similar, and correct except when the input is empty (where it should return 0 instead of crashing).

Fair observation :-), this is what I get for coding on my phone.

That solution scans the list once and only does a single compare on each element. Short of calling out to compiled code, I’ll be curious if any real speedups are possible.

I tried leveraging numpy’s np.diff to filter entries that are lower than the immediately preceding ones, but it was ~50% slower than the pure python solution I posted.

Pandas has cummax and nunique methods on Series that make this problem trivial, but it’s not on ATO. I’ll try it out when I’m on a real computer to see if that provides a speedup, or if the cost of creating two Series outweighs the benefits.

Looks like polars has similar methods, which should at least save materializing the intermediate series.

That is an interesting challenge. For different sizes of xs or different values in range(-1, 3) different solutions will be optimal. For this challenge conversion of the input data xs to numpy in the naive way (e.g. np.array(xs)) already takes more time than solving with

def maxes(xs):
    n = 0
    x_max = -1
    for x in xs:
        if x > x_max:
            n += 1
            x_max = x
    return n

(with some tricks you can get it into numpy faster, but even then it takes quite a bit of time)

For on the numpy path: should the solution be able to handle big ints? Those are not in the input data generated by the benchmark script. Having to deal with big ints (e.g. ints outside the 64 bit range) would probably rule out numpy as a possible solution.

Update: here is a numpy solution that is about twice as fast:

import array
import numpy as np
m3 = np.array([-1, -1, -1], dtype=np.int32)

def maxes(xs):
    a = np.array(array.array('I', xs)).astype(np.int32)
    prev = -1
    while True:
        bb = np.concatenate( (m3, a))
        good = a > bb[2:-1]      
        np.logical_and(a > bb[1:-2], good, out=good)
        np.logical_and(a > bb[:-3], good, out=good)        
        a = a.compress(good)
        if a.size == prev:
            return a.size
        prev = a.size
2 Likes

Nice to see that array.array is a fast way into NumPy, although I wonder why np.array(xs) doesn’t do that for us. (For a list of ints <256 I likewise once used bytes as intermediary, that also worked very well)

Yes, int32 is fine for this. I am however conflicted about the reliance on the particular structure of the benchmark data (which I had hoped would be close to worst case). If I prepend a single large value with xs = (10**7,) + tuple(...), then that solution takes about 850 ms instead of about 3 ms. I suspect there’s a NumPy solution that’s always that fast.

Here is one that should perform better for different types of data (this is the cummax suggestion from @kapinga, but without the unique which is really slow on numpy):

import numpy as np

def maxes(xs):
    a = np.array(array.array('I', xs))
    b = np.maximum.accumulate(a)
    return (np.diff(b)>0).sum() + (a.size>0)    
3 Likes

I… didn’t know about (or had forgotten) that numpy ufuncs can accumulate!

I’ve poked around and can’t create anything faster than @eendebakpt’s latest function. Using polars gets very close, but consistently a tad slower (e.g. 1.46 ms vs 1.79 ms on my laptop):

import polars as pl

def maxes(xs):
    df = pl.DataFrame({"data": xs}, schema={"data": pl.Int64()})
    n = df.select(pl.col("data").cum_max().n_unique())
    return n["data"][0]

Interestingly, polars is faster with the direct tuple input. Converting to an array.array slows things down noticeably.

Also interestingly, prepending a single large value at the front causes my initial “pure python” solution to be just as fast as the numpy and polars solutions (all are about 1.8 ms). I presume the simple scan of the data is what dominates in all cases.

I also played with expanding the input to be 10x larger and allow 10x larger jumps between points (e.g. xs = tuple(accumulate(random.choices(range(-10, 30), k=10**6), initial=10**6))). This causes the polars solution to consistently beat the array.array → np.ndarray solution by 40% (15 ms vs 25 ms).

2 Likes

This might be more fun in C :wink:. I don’t think we’ve seen anything yet in pure Python that improves over “the obvious” approach.

In a low-level language, close to the HW, @Stefan2’;s test data creates headaches because there are so many new maximums encountered. This makes HW branch prediction useless, and mispredicted branches have major bad effects on speed.

So, in something like C, I’d trash trying to trick the compiler into generating branch-free “conditional move” instructions instead. Like

    max = -1;
    n = 0;
    for (i = 0; i < len_of_xs; ++i) {
        int x = xs[i];
        int newmax = x > max;
        n += newmax;
        max = newmax ? x : max;
    |

Some compilers, with some options, will generate a conditional move instruction for the ternary ?: operator if the subexpressions are simple enough. Then there are no branches remaining except for the highly predictable “is the loop done yet?” implicit branch.

Python is too far from the HW for tricks like that to have any chance of working - the ordinary operation of the eval loop is bristling with branches of its own, and Python’s ternary "… if … else …` expression has 0 chance of turning into a conditional move (although it might in some highly hypothetical future).

2 Likes

I think it’s fun in Python as well. I have several pure Python ways that are a bit faster, including one I really like. (Not as fast as the fast NumPy way, though.)

1 Like

Here is a pure Python solution that is a tiny bit faster (by avoiding construction of ints in n += 1)

import itertools

def maxes_3(xs):
    x_max = -1
    c = itertools.repeat(None, len(xs))
    for x in xs:
        if x > x_max:
            next(c)
            x_max = x
    return len(xs) - c.__length_hint__()

The core (and the bulk of computation time) is still the same though: iterate over xs and compare to x_max.

Side question: why does itertools.repeat have a length hint, but not a length?

1 Like

Ha, good one. And for _ in c: break instead of next(c) seems to slightly speed it up further. Most of my own optimizations are also avoiding construction of ints in various ways.

As far as I know, length hints are for iterators and lengths are not. I think it’s because of the “volatile” nature of iterators.

I’m… shocked ! … I’m seeing that np.array(array.array('I', xs)) is faster than np.asarray(xs, dtype=int)… Never knew that.

Yet there is a bit of time to win over np.sum :

def maxes(xs):
    a = np.array(array.array('I', xs))
    b = np.maximum.accumulate(a)
    return np.count_nonzero(np.diff(b)) + (a.size > 0)
1 Like

Ok, here are results and my own solutions. I modified the benchmark script slightly, it’s still the same benchmark, but run several times.

 1.67 1.66 1.71 hprodh_numpy_accumulate
 1.76 1.81 1.79 eendebakpt_numpy_accumulate
 2.62 2.63 2.67 eendebakpt_numpy_compress
 3.60 3.59 3.62 Stefan2_nested_loops_float
 3.85 3.86 3.90 Stefan2_nested_loops
 3.94 3.97 3.97 Stefan2_float
 4.09 4.09 4.09 Stefan2_hi_lo
 4.13 4.15 4.12 eendebakpt_lengthhint_for
 4.16 4.17 4.16 eendebakpt_lengthhint
 4.30 4.31 4.30 Stefan2_pairs_even
 4.38 4.42 4.43 Stefan2_loop
 4.41 4.41 4.43 eendebakpt_loop
 4.93 4.94 4.99 kapinga_loop
 8.02 8.08 8.09 Stefan2_accumulate

Python: 3.13.0 (main, Nov  9 2024, 10:04:25) [GCC 14.2.1 20240910]

Full script and output are at the end of this post.

Now my solutions:

Straightforward loop

My own starting point was the straightforward loop as well:

def maxes_Stefan2_loop(xs):
    count = 0
    X = 0
    for x in xs:
        if x > X:
            X = x
            count += 1
    return count

The for x in xs and if x > X are rather essential and the X = x is fast, but the count += 1 has room for improvement.

Floats are faster than ints

This one is dirty, as it can “only” count up to 253 = 9,007,199,254,740,992.

def maxes_Stefan2_float(xs):
    count = 0.
    X = 0
    for x in xs:
        if x > X:
            X = x
            count += 1.
    return int(count)

Small ints (-5 to 256) are faster

CPython has them cached and doesn’t create new int objects for them. So I use two counters: one somewhat for the low 8 bits, and one for the higher bits which updates only rarely. The low counter goes backwards because it’s fast to check whether it’s 0.

def maxes_Stefan2_hi_lo(xs):
    X = 0
    hi = lo = 256
    for x in xs:
        if x > X:
            X = x
            lo -= 1
            if not lo:
                lo = 256
                hi += 256
    return hi - lo

Update counter only every second time

Count not the individual maxes but pairs of them, so we create a new int object only half as often. Here I use a bool flag to know whether I have seen an even or odd number of maxes.

def maxes_Stefan2_pairs_even(xs):
    X = 0
    pairs, even = 0, True
    for x in xs:
        if x > X:
            X = x
            if even:
                even = False
            else:
                pairs += 1
                even = True
    return 2 * pairs + (not even)

Two states via nested loops

Again count only every second max. My two loops represent two program states: In the first=outer loop, X is the max so far and I’m looking for an even larger x. In the second=inner loop, x is the max so far and I’m looking for an even larger X. Whenever a new max is found, I switch to the other state by running into or dropping out of the inner loop. Besides doing count += only half as often, I also don’t need an X = x anymore.

def maxes_Stefan2_nested_loops(xs):
    X = 0
    count = 0
    xs = iter(xs)
    for x in xs:
        if x > X:
            for X in xs:
                if X > x:
                    count += 2
                    break
            else:
                return count + 1
    return count

I’ve also used this “two states via nested loops” for a few other things, for example for merging two sorted iterables (see the merge_nested_states function in the code there).

Same but with floats

Just dirty with floats again, and since I count only pairs, it works up to 254.

def maxes_Stefan2_nested_loops_float(xs):
    X = 0
    pairs = 0.
    xs = iter(xs)
    for x in xs:
        if x > X:
            for X in xs:
                if X > x:
                    pairs += 1.
                    break
            else:
                return 2 * int(pairs) + 1
    return 2 * int(pairs)

Pretty

from itertools import accumulate

def maxes_Stefan2_accumulate(xs):
    return len(set(accumulate(xs, max)))

Full script and output

Script
def maxes_kapinga_loop(xs):
    n = 1
    x_max = xs[0]
    for x in xs[1:]:
        if x > x_max:
            n += 1
            x_max = x
    return n


def maxes_eendebakpt_loop(xs):
    n = 0
    x_max = -1
    for x in xs:
        if x > x_max:
            n += 1
            x_max = x
    return n


import array
import numpy as np
m3 = np.array([-1, -1, -1], dtype=np.int32)

def maxes_eendebakpt_numpy_compress(xs):
    a = np.array(array.array('I', xs)).astype(np.int32)
    prev = -1
    while True:
        bb = np.concatenate( (m3, a))
        good = a > bb[2:-1]      
        np.logical_and(a > bb[1:-2], good, out=good)
        np.logical_and(a > bb[:-3], good, out=good)        
        a = a.compress(good)
        if a.size == prev:
            return a.size
        prev = a.size
import numpy as np


import numpy as np

def maxes_eendebakpt_numpy_accumulate(xs):
    a = np.array(array.array('I', xs))
    b = np.maximum.accumulate(a)
    return (np.diff(b)>0).sum() + (a.size>0)


import itertools

def maxes_eendebakpt_lengthhint(xs):
    x_max = -1
    c = itertools.repeat(None, len(xs))
    for x in xs:
        if x > x_max:
            next(c)
            x_max = x
    return len(xs) - c.__length_hint__()


def maxes_eendebakpt_lengthhint_for(xs):
    x_max = -1
    c = itertools.repeat(None, len(xs))
    for x in xs:
        if x > x_max:
            for _ in c: break
            x_max = x
    return len(xs) - c.__length_hint__()


def maxes_hprodh_numpy_accumulate(xs):
    a = np.array(array.array('I', xs))
    b = np.maximum.accumulate(a)
    return np.count_nonzero(np.diff(b)) + (a.size > 0)


def maxes_Stefan2_loop(xs):
    count = 0
    X = 0
    for x in xs:
        if x > X:
            X = x
            count += 1
    return count


def maxes_Stefan2_float(xs):
    count = 0.
    X = 0
    for x in xs:
        if x > X:
            X = x
            count += 1.
    return int(count)


def maxes_Stefan2_hi_lo(xs):
    X = 0
    hi = lo = 256
    for x in xs:
        if x > X:
            X = x
            lo -= 1
            if not lo:
                lo = 256
                hi += 256
    return hi - lo


def maxes_Stefan2_pairs_even(xs):
    X = 0
    pairs, even = 0, True
    for x in xs:
        if x > X:
            X = x
            if even:
                even = False
            else:
                pairs += 1
                even = True
    return 2 * pairs + (not even)


def maxes_Stefan2_nested_loops(xs):
    X = 0
    count = 0
    xs = iter(xs)
    for x in xs:
        if x > X:
            for X in xs:
                if X > x:
                    count += 2
                    break
            else:
                return count + 1
    return count


def maxes_Stefan2_nested_loops_float(xs):
    X = 0
    pairs = 0.
    xs = iter(xs)
    for x in xs:
        if x > X:
            for X in xs:
                if X > x:
                    pairs += 1.
                    break
            else:
                return 2 * int(pairs) + 1
    return 2 * int(pairs)


from itertools import accumulate

def maxes_Stefan2_accumulate(xs):
    return len(set(accumulate(xs, max)))


funcs = [
    maxes_kapinga_loop,
    maxes_eendebakpt_loop,
    maxes_eendebakpt_numpy_compress,
    maxes_eendebakpt_numpy_accumulate,
    maxes_eendebakpt_lengthhint,
    maxes_eendebakpt_lengthhint_for,
    maxes_hprodh_numpy_accumulate,
    maxes_Stefan2_loop,
    maxes_Stefan2_float,
    maxes_Stefan2_hi_lo,
    maxes_Stefan2_pairs_even,
    maxes_Stefan2_nested_loops,
    maxes_Stefan2_nested_loops_float,
    maxes_Stefan2_accumulate,
]


from itertools import accumulate
from time import perf_counter as time
from statistics import mean, stdev
import random
import sys

# Reproducibility
seed = random.randrange(10**9)
print(f'{seed=}')
random.seed(seed)

# Correctness
for _ in range(10):
    xs = tuple(accumulate(random.choices(range(-1, 3), k=10**5), initial=10**6))
    expect = funcs[0](xs)
    for f in funcs[1:]:
        result = f(xs)
        if result != expect:
            raise Exception(f'{f.__name__} counted {result} instead of {expect}.')

# Speed
timess = {f: '' for f in funcs}
for i in range(5):
    xs = tuple(accumulate(random.choices(range(-1, 3), k=10**5), initial=10**6))

    times = {f: [] for f in funcs}
    def stats(f):
        ts = [t * 1e3 for t in sorted(times[f])[:5]]
        return f'{mean(ts):5.2f} ± {stdev(ts):4.2f} ms '
    for _ in range(100):
        random.shuffle(funcs)
        for f in funcs:
            t = time()
            f(xs)
            times[f].append(time() - t)
    for f in sorted(funcs, key=stats):
        print(stats(f), f.__name__.removeprefix('maxes_'))
        if i >= 2:
            timess[f] += stats(f)[:5]
    print()

for f in sorted(funcs, key=stats):
    print(timess[f], f.__name__.removeprefix('maxes_'))

print('\nPython:', sys.version)

Attempt This Online!

Output
seed=185826456
 1.72 ± 0.04 ms  hprodh_numpy_accumulate
 1.85 ± 0.03 ms  eendebakpt_numpy_accumulate
 2.74 ± 0.01 ms  eendebakpt_numpy_compress
 3.62 ± 0.01 ms  Stefan2_nested_loops_float
 3.86 ± 0.01 ms  Stefan2_nested_loops
 3.95 ± 0.02 ms  Stefan2_float
 4.11 ± 0.01 ms  Stefan2_hi_lo
 4.13 ± 0.01 ms  eendebakpt_lengthhint_for
 4.14 ± 0.04 ms  eendebakpt_lengthhint
 4.29 ± 0.02 ms  Stefan2_pairs_even
 4.38 ± 0.03 ms  Stefan2_loop
 4.42 ± 0.02 ms  eendebakpt_loop
 4.98 ± 0.04 ms  kapinga_loop
 8.18 ± 0.05 ms  Stefan2_accumulate

 1.69 ± 0.03 ms  hprodh_numpy_accumulate
 1.77 ± 0.04 ms  eendebakpt_numpy_accumulate
 2.64 ± 0.04 ms  eendebakpt_numpy_compress
 3.60 ± 0.01 ms  Stefan2_nested_loops_float
 3.88 ± 0.01 ms  Stefan2_nested_loops
 3.95 ± 0.02 ms  Stefan2_float
 4.08 ± 0.01 ms  Stefan2_hi_lo
 4.11 ± 0.02 ms  eendebakpt_lengthhint_for
 4.15 ± 0.02 ms  eendebakpt_lengthhint
 4.30 ± 0.01 ms  Stefan2_pairs_even
 4.39 ± 0.01 ms  Stefan2_loop
 4.40 ± 0.02 ms  eendebakpt_loop
 4.94 ± 0.04 ms  kapinga_loop
 7.98 ± 0.09 ms  Stefan2_accumulate

 1.67 ± 0.03 ms  hprodh_numpy_accumulate
 1.76 ± 0.02 ms  eendebakpt_numpy_accumulate
 2.62 ± 0.02 ms  eendebakpt_numpy_compress
 3.60 ± 0.02 ms  Stefan2_nested_loops_float
 3.85 ± 0.02 ms  Stefan2_nested_loops
 3.94 ± 0.01 ms  Stefan2_float
 4.09 ± 0.02 ms  Stefan2_hi_lo
 4.13 ± 0.01 ms  eendebakpt_lengthhint_for
 4.16 ± 0.01 ms  eendebakpt_lengthhint
 4.30 ± 0.03 ms  Stefan2_pairs_even
 4.38 ± 0.02 ms  Stefan2_loop
 4.41 ± 0.02 ms  eendebakpt_loop
 4.93 ± 0.06 ms  kapinga_loop
 8.02 ± 0.01 ms  Stefan2_accumulate

 1.66 ± 0.01 ms  hprodh_numpy_accumulate
 1.81 ± 0.02 ms  eendebakpt_numpy_accumulate
 2.63 ± 0.02 ms  eendebakpt_numpy_compress
 3.59 ± 0.02 ms  Stefan2_nested_loops_float
 3.86 ± 0.03 ms  Stefan2_nested_loops
 3.97 ± 0.02 ms  Stefan2_float
 4.09 ± 0.02 ms  Stefan2_hi_lo
 4.15 ± 0.02 ms  eendebakpt_lengthhint_for
 4.17 ± 0.01 ms  eendebakpt_lengthhint
 4.31 ± 0.02 ms  Stefan2_pairs_even
 4.41 ± 0.01 ms  eendebakpt_loop
 4.42 ± 0.01 ms  Stefan2_loop
 4.94 ± 0.05 ms  kapinga_loop
 8.08 ± 0.05 ms  Stefan2_accumulate

 1.71 ± 0.01 ms  hprodh_numpy_accumulate
 1.79 ± 0.01 ms  eendebakpt_numpy_accumulate
 2.67 ± 0.02 ms  eendebakpt_numpy_compress
 3.62 ± 0.01 ms  Stefan2_nested_loops_float
 3.90 ± 0.01 ms  Stefan2_nested_loops
 3.97 ± 0.02 ms  Stefan2_float
 4.09 ± 0.01 ms  Stefan2_hi_lo
 4.12 ± 0.03 ms  eendebakpt_lengthhint_for
 4.16 ± 0.03 ms  eendebakpt_lengthhint
 4.30 ± 0.02 ms  Stefan2_pairs_even
 4.43 ± 0.01 ms  Stefan2_loop
 4.43 ± 0.02 ms  eendebakpt_loop
 4.99 ± 0.05 ms  kapinga_loop
 8.09 ± 0.03 ms  Stefan2_accumulate

 1.67 1.66 1.71 hprodh_numpy_accumulate
 1.76 1.81 1.79 eendebakpt_numpy_accumulate
 2.62 2.63 2.67 eendebakpt_numpy_compress
 3.60 3.59 3.62 Stefan2_nested_loops_float
 3.85 3.86 3.90 Stefan2_nested_loops
 3.94 3.97 3.97 Stefan2_float
 4.09 4.09 4.09 Stefan2_hi_lo
 4.13 4.15 4.12 eendebakpt_lengthhint_for
 4.16 4.17 4.16 eendebakpt_lengthhint
 4.30 4.31 4.30 Stefan2_pairs_even
 4.38 4.42 4.43 Stefan2_loop
 4.41 4.41 4.43 eendebakpt_loop
 4.93 4.94 4.99 kapinga_loop
 8.02 8.08 8.09 Stefan2_accumulate

Python: 3.13.0 (main, Nov  9 2024, 10:04:25) [GCC 14.2.1 20240910]
3 Likes

I stand corrected again - this is a lot of fun in Python! Some of those approaches are delightfully perverse :smile:.

1 Like

You can shave a tiny bit off @hprodh’s solution by skipping an assignment:

def maxes_hprodh_numpy_accumulate(xs):
    b = np.maximum.accumulate(array.array('I', xs))
    return np.count_nonzero(np.diff(b)) + (len(xs) > 0)
2 Likes

@jamestwebber Indeed looks ~0.05 ms faster. An assignment itself only takes a few nanoseconds, but you also removed the np.array call.

@tim.one Yeah, I thought you might like at least the nested loops solution, since @dg-pb and you mentioned you liked my merge solution. That was actually why I created the challenge, to share that technique again.

Oh and full disclosure: I earlier did this at CheckiO (same task but with reverse data).

2 Likes