How do you use data structures in your concurrent programs?

Hello there :wave:

I’m looking for ways in which folks are using data structures, be it a dictionary, a list, a queue, and everything else.
I’m especially interested in concurrent programs where you may be using threads, coroutines, processes, or sub interpreters.

If you’re using concurrent data structures in C/C++/Rust, I’m interested in that too!

This is not specifically bound to Python or CPython, it’s a survey for a library I’m developing for atomic data structures.
I’m trying to understand where to direct my next efforts.

Specifically, I’d be interested in:

  • what is the computation/purpose of your program?
  • why do you need concurrency/parallelism?
  • what objects are you sharing among threads/coroutines/processes/subintepreters?
  • what sort of obstacles did you encounter in managing the shared state?
  • links to source code would be nice if you can share them :pray:

If you had the time to tell me a bit about how you’re using data structures in your concurrent programs, I’d love to hear it! :lotus:

Is this a research project? Who for?
Do you plan to publish results?

It’s sort of a research project yeah, nothing academic if that’s what you have in mind.

My current plan is to more or less copy the interfaces of java’s atomics for my Python package, but I’d like to understand if Python’s ecosystem’s needs are so different to warrant different interfaces.

I guess the survey/research is just for cereggii, but it’s possible that others would be interested.
If I can gather a good enough sample and there’s interest I’ll try to put the results somewhere.

As I understand it there is no need for atomics in pure python code.

Atomics will be needed inside of the implementation for free threaded python.

C API using extensions will need to follow the rules for free threading to access python objects without threading issues, data races etc.
Understanding this is on my todo list.

Responses to your bullet points:

  • Data manipulation
  • Parallelizable
  • References
  • None
  • Not authorized for disclosure

I adhere closely to the strict definition of data structures and don’t see synchronization issues specifically related to them.

For instance, if a hash table is atomic, that alone doesn’t ensure data integrity. A race condition can still occur in your algorithm, where atomicity wouldn’t resolve the issue. This approach only works if you’re okay with ‘first-come, first-served’ outcomes. Otherwise, synchronization issues should be addressed directly within your algorithms. I don’t see atomicity as being related to synchronization.

In one project at work I’m putting all the shared data into a multiprocessing manager that owns the data, and then when I get a request each individual flask requests accesses that data. It’s neat and hacky and parts of it should probably be database calls instead.

@barry-scott If you’re interested I gave a talk about this at PyCon this year Python Software Foundation News: The Python Language Summit 2024: Free-threading ecosystems


@elis.byberi what were the main pain points in implementing that?
For instance, there’s an AtomicDict.reduce method in my library for solving the issues you mentioned, not strictly related to the atomicity of the hash table itself.
Would the implementation have been easier with this?


@ajoino what is the goal of the system? Is it a data processing application?

You have a core process that exposes a flask server and when a worker is free it requests data from the core process, am I getting this right?

I think this thread should be moved to the Help category (as that is the general forum for non-Ideas things).

I strongly agree.

Just yesterday at work, a coworker showed my team a specialized thread pool executor, subclassed from concurrent.futures, with special properties appropriate to the use case we had. In particular, it exposed interfaces which let us ensure that we have an available thread in the pool before pulling work from an external queue, which was important in that application.
It was clever and well designed and has nothing to do with the datastructures being used.


I’m not sure I understand the OP’s questions or expected outcome here. I’ll try to answer…
(I mean, I sort of get that the ideal answer would be directed at the library they’re building, but when I read the questions I don’t see how to map them onto such an answer.)

When I need to synchronize threads, I use the primitives provided by threading. If I’m designing an abstraction and synchronization guarantees are part of the interface, it’s always use case specific. I try to avoid program structures which require synchronization, and only sync when it’s really necessary, usually when accessing external resources (like console output).

Most recently I’ve started to consider using immutable structures, like pyrsistent and py-rpds, for more use cases. Immutability, applied well, can allow for concurrent workers which have little to no need to synchronize with one another.

I’ve written many programs over many years at this point. I learn new things every year. I didn’t have “a program” to point at – that’s not the shape of a multi year career programming.

Unfortunately, I don’t have good source links. My interesting concurrent code is all closed source, and my open source projects don’t directly deal with concurrency.

1 Like

moved to Help :+1:

1 Like

The expected outcome is to collect a bunch of realistic use cases for data structures used in concurrent applications.

The reason I’m doing this is that I believe with free threading and sub interpreters coming to Python there will be an increasing interest into concurrent data structures.
My aim with cereggii is to provide a better developer experience when writing concurrent programs.

Collecting current usages of concurrent data structures (or any other synchronization primitive) I think can help future efforts.
For me, it certainly would, and possibly for others too.

The goal is to provide an rpc interface for a bunch of other processes communicating via IPC. One process reads the IPC messages and queues them for reading via the rpc. This is for a system that runs on a single host and will have limited access to the internet so it doesn’t have to scale beyond one user making more than like 1 request per second.

1 Like

There might be. Individual reads/writes are atomic, but

some_int += 1

isn’t (and might be useful). As well as other things like exchange, compare-exchange etc which are commonly used in c/c++ code but aren’t natively available in Python.

Of course, any read-modify-write will result in logic errors.
And the read-modify-write may not be obvious as things like the += example.

The python cmpxchg would be this I guess:

a, b = b, a

But I was specifically addressing the idea of atomics in python.
In C/C++/rust I would need these atomics for efficient threaded code.

I was saying that an AtomicInt/AtomicBool/AtomicFloat interface might be useful in Python to let people do this kind of operation without (explicit) locking. Of course it wouldn’t achieve C-speed, and types would necessarily be mutable rather immutable. But it could be convenient.

It seems that reduce functions in a serial manner, which would block all other threads working with AtomicDict.

Here’s a typical setup I use for parallel processing:

Example
from multiprocessing import Process, Manager
import time

# Number of processes and range for prime counting
NUM_PROCESSES = 4
RANGE_START = 1
RANGE_END = 42_000  # Large range to be divided among processes


def is_prime(n):
    """Check if a number is prime using a naive approach."""
    if n <= 1:
        return False
    for i in range(2, n):  # Check all numbers from 2 to n-1
        if n % i == 0:
            return False
    return True


def count_primes_in_range(start, end):
    """Count prime numbers in a given range."""
    count = 0
    for i in range(start, end):
        if is_prime(i):
            count += 1
    return count


def manager_dict_update(proc_num, shared_dict, start, end):
    # Perform heavy computation on a subset of the range
    result = count_primes_in_range(start, end)
    # Update shared dictionary with the result for this process
    shared_dict[proc_num] = result


def single_process_count_primes():
    """Count primes in the entire range using a single process."""
    return count_primes_in_range(RANGE_START, RANGE_END)


# Benchmarking function
def benchmark():
    range_size = (RANGE_END - RANGE_START) // NUM_PROCESSES  # Divide range into equal parts

    # Manager Dictionary Benchmark with Multiple Processes
    with Manager() as manager:
        shared_dict = manager.dict()  # Shared dictionary for results
        processes = []

        for i in range(NUM_PROCESSES):
            start = RANGE_START + i * range_size
            end = start + range_size if i < NUM_PROCESSES - 1 else RANGE_END
            p = Process(target=manager_dict_update, args=(i, shared_dict, start, end))
            processes.append(p)

        start_time = time.time()
        for p in processes:
            p.start()
        for p in processes:
            p.join()
        manager_dict_time = time.time() - start_time

        print(f"Manager dictionary time (Multiple Processes): {manager_dict_time:.2f} seconds")
        print("Prime counts by process (Manager dict):", dict(shared_dict))
        total_count_multi = sum(shared_dict.values())
        print("Total prime count (Multiple Processes):", total_count_multi)

    # Single Process Benchmark
    start_time = time.time()
    total_count_single = single_process_count_primes()
    single_process_time = time.time() - start_time

    print(f"Single process time: {single_process_time:.2f} seconds")
    print("Total prime count (Single Process):", total_count_single)


# Run the benchmark
benchmark()

Exactly :dart:
I believe bubbling up that functionality to Python code would be beneficial.
Many other languages have them, there’s no reason why Python shouldn’t.

(The example you brought up is also implemented in AtomicInt64 where += x is an atomic lock-free operation.)

Totally agree :+1:
I was also considering having an AtomicNumber, instead of having AtomicInt / Float / Complex / MyCustomNumber.
Python is not Java, we don’t necessarily have to restrict an atomic numeric class to one kind of numbers.
Do you perhaps have any thoughts on this idea?


[nit]
That wouldn’t implement a behavior like cmpxchg, but more like xchg.
Also, that would not be atomic (here atomic is the proper word), despite it being a one-liner and looking like it’s atomic, in the same way as C’s a++ is not atomic.
If a and b were shared and other threads were operating on them, this swap would not be thread-safe.

Furthermore, it’s entirely possible that the upcoming JIT will try to be clever optimizing that line, producing weird results for other threads observing a and b.


Thank you for sharing that! :heart:
I actually had no idea multiprocessing.Manager existed :eyes:

For this typical setup of yours, do you have any thoughts about free-threading?
It looks to me like exactly the sort of hoops you would go through just to circumvent the GIL.

Here I think AtomicDict would be a nice fit as well.

That is, you could rewrite it like this:
from cereggii import AtomicDict, NOT_FOUND
from threading import Thread
import time

...

def manager_dict_update(proc_num, shared_dict, start, end):
    # Perform heavy computation on a subset of the range
    result = count_primes_in_range(start, end)
    # Update shared dictionary with the result for this process
    # shared_dict[proc_num] = result  # this would work, but
    shared_dict.compare_and_set(key=proc_num, expected=NOT_FOUND, desired=result)
    # with AtomicDict.compare_and_set you either get:
    #   - the desired update (if proc_num not in shared_dict), or
    #   - an exception otherwise
    # raising here might be helpful to catch a bug somewhere else


# Benchmarking function
def benchmark():
    range_size = (RANGE_END - RANGE_START) // NUM_PROCESSES  # Divide range into equal parts

    shared_dict = AtomicDict()  # Shared dictionary for results
    processes = []

    for i in range(NUM_PROCESSES):
        start = RANGE_START + i * range_size
        end = start + range_size if i < NUM_PROCESSES - 1 else RANGE_END
        p = Thread(target=manager_dict_update, args=(i, shared_dict, start, end))
        processes.append(p)

    start_time = time.time()
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    manager_dict_time = time.time() - start_time
...

Do you have any thoughts?

No, it actually doesn’t.
Most methods of AtomicDict are lock-free and don’t block other threads.
I’ll make it explicit in the documentation, thank you for bringing it up!

My feeling is that a few well-defined types would probably end up more useful than a totally generic AtomicNumber. But maybe that’s my inner c++ programmer speaking.

1 Like

Yep true not the cmpxchg version.

But I want to show another bit of simple python that can fail without locking.

Edit: s/with/without/