Incremental Combination Generation from Laggy Generators

Hi everyone,

I’m working on a spare-time dataflow pipeline project and ran into an issue when trying to process combinations of elements from generators that yield results incrementally (for example, items as they are downloaded from the internet or produced by multithreaded/multiprocessing workflows).

Ideally, I’d like to start generating and processing combinations (using functions like itertools.combinations, combinations_with_replacement, or itertools.product) as soon as the first few items are available. However, it seems that these functions require the entire input iterable to be collected (often converting it to a tuple) before they begin yielding any results. This behavior blocks the pipeline until all elements are available, which could lead to performance bottlenecks in scenarios where data arrives gradually.

Has anyone encountered this issue before? Are there any existing solutions—perhaps trivial workarounds—that allow for more incremental combination generation with laggy generators? I’ve considered approaches like manual buffering or writing a custom iterator that yields new combinations as data arrives, but I’m wondering if there’s a more straightforward solution or design pattern that I might be overlooking.

I’d appreciate any insights or suggestions!

Thanks in advance,

Ben

1 Like

It looks like the problem is that itertools.product fully consumes its iterables before it starts to yield results. I found a discussion from a few years ago about adding another function that would yield results incrementally, but it doesn’t look like it went anywhere. You might could look at the suggested implementation and see if it would work for you, or it was also noted that sympy has a function that can do this.

In general, though, I think the answer is that there isn’t a particular design pattern that you’re missing. If itertools doesn’t quite work the way you need it to, you may need to write your own version of its functions, or find other implementations that do work for you.

Hello,

You can do this already. As soon as new values are available from the generator, they need to be appended to the existing tuple or list for the next combo generation (create new list and both copy previous value and append latest values). However, combinations will be repeated for subsequent combinations.

So that you do not have to repeat post processing for existing combinations, you can loop through the most recently updated tuple or list of the generator values and compare them with the previous processed combination list.

Here is a simple example where the first available values from a generator are passed in to the itertools function for generating combinations. You can then work on processing the existing combinations while you wait for additional generator values. Once new values are available from the generator, you can append them to the existing list. You will have to rerun the itertools.combinations. You can then loop through the latest combination values and compare them with the combinations that you have already processed so that you don’t repeat combo post-processing.

One requirement is that the minimum number of values in the available generator list for combo generation must be equal or greater than the combination length desired.

Here is an example:

import itertools
from collections import deque

def process_queue_combinations(queue, comb_length):

    return list(itertools.combinations(queue, comb_length))

combination_length = 3

print('\nGenerate combinations with the initial available list ...')
# Process first version
my_queue_1 = deque([1, 2, 3, 4])  # Simulate available values from the generator
combinations_1 = process_queue_combinations(my_queue_1, combination_length)
for combo in combinations_1:
    print(combo)

print('\nGenerate combinations on the updated incremented list ...')
# Process updated version
my_queue_2 = deque([1, 2, 3, 4, 5, 6])
combinations_2 = process_queue_combinations(my_queue_2, combination_length)
for combo in combinations_2:
    print(combo)

print('\nBegin processing the difference as tuple has new values from the generator ...')
# Process only the difference - those of which have not yet been processed
for combo in combinations_2:
    if combo not in combinations_1:
        print(combo)

Is this what you had in mind? Did I read it right?

Each new value x can produce new combinations that you can process before going to the next value. In other words: always produce all combinations that are already possible with the values you got so far.

def combinations(iterable, r):
    if not r:
        yield ()
        return
    previous = []
    for x in iterable:
        for comb in itertools.combinations(previous, r - 1):
            yield comb + (x,)
        previous.append(x)

Attempt This Online!

Hello,

I worked out a more general solution to this problem. Per your requirement, you have an initial set of values that you would like to obtain combinations potentially using itertools.combinations (as an example). You also stated that you would need to obtain results incrementally per “laggy generators” that don’t provide the values all at once.

To run this script, first create an instance by passing in the required combination length integer as an argument. To obtain combination values, call the non_repeat_combos method by passing in values in list format. As new value become available via the laggy generators, append them to the existing list of values and recall the class method as needed. Note that the newly produced combinations per method call do not repeat. Only new combinations are produced as a function of the latest values appended to the existing list.

import itertools

#Use this class and method to pass in values incrementally 
# as new values are being made available:
class IncrementalComboGenerator:

    def __init__(self, combo_length):

        self.combo_length = combo_length

        self.latest_combos = []
        self.all_combos = []

    def non_repeat_combos(self, latest_list):

        combo_diff = []

        if not self.all_combos:
            combo_diff = list(itertools.combinations(latest_list, self.combo_length))

            for tup in combo_diff:
                self.all_combos.append(tup)

        elif not self.latest_combos:

            self.latest_combos = list(itertools.combinations(latest_list, self.combo_length))

            for combo_val in self.latest_combos:

                if combo_val not in self.all_combos:
                    combo_diff.append(combo_val)

            for tup in combo_diff:
                self.all_combos.append(tup)

        else:

            self.latest_combos = list(itertools.combinations(latest_list, self.combo_length))

            for combo_unit in self.latest_combos:
                if combo_unit not in self.all_combos:
                    combo_diff.append(combo_unit)

            for tup in combo_diff:
                self.all_combos.append(tup)

        return combo_diff

# Create instance - provide combination length
combos = IncrementalComboGenerator(3)        

# Obtain combinations incrementally as you're obtaining new values
print(combos.non_repeat_combos([1,2,3,4]))        # Original values
print(combos.non_repeat_combos([1,2,3,4,5]))      # append new value 5
print(combos.non_repeat_combos([1,2,3,4,5,6]))    # append new value 6
print(combos.non_repeat_combos([1,2,3,4,5,6,7]))  # append new value 7

The following function processes values provided all at once.

def process_queue_combinations(queue, comb_length):

    return list(itertools.combinations(queue, comb_length))

combination_length = 3

# Simulate available values from the generator
my_queue_1 = deque([1, 2, 3, 4, 5, 6, 7]) 

combinations_1 = process_queue_combinations(my_queue_1, combination_length)
for combo in combinations_1:
    print(combo)

print(len(combinations_1))

If you compare the combinations produced by the function when values are passed all at once to the instance method where values are passed incrementally via multiple method calls, you will observe that they produce the same combinations.

It is a nice problem. I found a way to extend the combinations without storing the previous combinations to check against, or give combinations twice:

@Paddy3118 Looks quite complicated and inefficient (especially memory-wise).

Comparison with 100 earlier items, 100 new items, and r=3:

1311 ms  extra_combs_Paddy3118
 117 ms  extra_combs_Stefan

164,748,320 bytes  extra_combs_Paddy3118
      3,832 bytes  extra_combs_Stefan
Code
from functools import reduce
from itertools import combinations
from pprint import pformat
from typing import Any, Iterable
from collections import deque


def extra_combs_Paddy3118(orig='abcd', r=3, delta='ef'):
    C = combinations  # Less typing
    extra_n = tuple(delta)  # n..n+x inclusive
    n = tuple(orig)
    n_r_terms = (extend_muli((C(n, r-i) if r-i > 0 else {()}),   # extend this
                             tuple(C(extra_n, i)))               # by this
                for i in range(1, min(len(extra_n), r) + 1))     # miss C(n, r)
    reduction = reduce(set.union, n_r_terms, set())
    # set(C(n+extra_n, r)) - reduction == set(C(n, r))
    return reduction


def extend_muli(s: set[tuple[Any]], e: Iterable) -> set[tuple[Any]]:
    """
    set where each tuple of s is extended in turn, by every *item in e

    s = {(1, 2, 3), (1, 2, 4), (1, 3, 4), (2, 3, 4)}
    extend_muli(s, ((5,), (6,))) == {(1, 2, 3, 5), (1, 2, 3, 6),
                              (1, 2, 4, 5), (1, 2, 4, 6),
                              (1, 3, 4, 5), (1, 3, 4, 6),
                              (2, 3, 4, 5), (2, 3, 4, 6)}
    """
    return {(*t, *item) for t in s for item in e}


def extra_combs_Stefan(orig='abcd', r=3, delta='ef'):
    if r:
        n = list(orig)
        for d in zip(delta):
            for c in combinations(n, r - 1):
                yield c + d
            n += d


consume = deque(maxlen=0).extend
funcs = extra_combs_Paddy3118, extra_combs_Stefan
args = range(100), 3, range(100, 200)

print(sorted(funcs[0](*args)) == sorted(funcs[1](*args)))

from time import time
for f in funcs * 2:
    t = time()
    consume(f(*args))
    print(f'{(time()-t)*1e3:4.0f} ms ', f.__name__)

import tracemalloc
for f in funcs * 2:
    tracemalloc.start()
    consume(f(*args))
    peak = tracemalloc.get_traced_memory()[1]
    tracemalloc.stop()
    print(f'{peak:11,} bytes ', f.__name__)

Attempt This Online!

Thanks for going easy on me! Yours is spectacularly the better solution, in time and space. It’s hardly a bust for me though as it is hugely gratifying to find the patterns and I’ll keep note of the method for future problem solving. I already have notes on being humble when coding - More than a decade on RosettaCode.org and I got to see many examples, like yours, of great problem solving.
Maybe… maybe I could start from the iterative combinations function, understand the indices generation and modify that… ?

  • Another time :smile:

I wound up creating a Python package called nonblocking_itertools that implements the five itertools methods that have this issue. It is unfortunately a lot slower than the equivalent itertools methods when all the data is available from the start, so I would only use it in situations where it’s expected that lag is the dominant runtime factor.

3 Likes