I was working on a small project called Gatling, which aims to unify different kinds of task execution models into a single, flexible interface.
Goal
Support both sync and async functions — including ordinary functions, iterators, and async iterators — with configurable execution backends:
Async iterators → can be executed in coroutine mode
Normal functions → can be executed using threads or processes
Current Progress
I’ve already implemented the coroutine and thread parts.
I’ve found it quite difficult to implement process-based execution — I’ve tried several approaches, but they’re either too complex or don’t work properly. (such as Loky,multiprocessing.Queue, lightweight local network servers, etc. Redis works, but it would be too heavy. I wanted to support a Redis-based queue eventually, but this package should work properly on its own without requiring any external servers.)
If anyone is interested in reviewing the code or contribute to the project or suggesting a cleaner design, please feel free to reach out to me. honyzeng7@gmail.com
wait_queue()
# some functions that
def fctn(x):
# do some stuff
# push some new element to queue
tfm = TaskFlowManager(retry_on_error=False)
with tfm.executor():
async_node = tfm.register_async(async_func, workers=2)
thread_node = tfm.register_thread(thread_func, workers=2)
process_node = tfm.register_process(process_func, workers=1)
dag = process_node(thread_node(async_node(wait_queue)))
tfm.register_flow(dag)
tfm.run()
# log as follows:
[00:00:25] remain=00:03:45 125.6 iter/sec success_rate=0.98
wait[1] >
async_io_task(5|10)[11] >
io_task(3|10)[12] >
cpu_task(2|10)[12] >
final_aggregate(1|10)[30]
With this TaskFlowManager, all we need to do is write a few simple functions, register them with the class, and define how many resources we want for each functions. The results will then automatically flow out of the pipeline. I’ve looked at other task flow APIs, but none of them seem to meet these simple requirements.
I’ve moved this from Ideas to Python Help since it doesn’t seem to be an idea for improving Python itself.
I’m unclear as to whether or not there’s a question here (?). The thread title is a question, but when I read the message for detail it doesn’t sound like there’s anything to answer.
Sounds like a fun project. If you want other people to use it you should consider publishing the code somewhere (e.g., PyPI packaging). I’d also suggest looking at existing tools like celery and dramatiq for inspiration.
It’s a bit more like the old fashioned manual subprocess.Popen instead of subprocess.run, but I found a wrapper coroutine that calls asyncio.create_subprocess_exec works nicely.
Thanks for the explanation! I totally get what you mean about wrapping asyncio.create_subprocess_exec – that’s a nice way to avoid the old manual Popen dance inside async code.
My main goal with this “taskflow” thing, though, isn’t really “how do I start a process from async” as such. What I’m trying to do is separate different kinds of work and push them onto different kinds of resources:
some functions are pure I/O (e.g. talking to an API),
some are network I/O,
some are local disk I/O,
some are CPU-bound,
and then have an execution layer that can schedule these onto appropriate backends (async I/O, threads, processes, etc.) so that overall we use fewer resources and run faster.
That’s why I’ve been focusing on process-based execution specifically: not just to “wrap a process with async”, but to see if there’s a clean pattern for running CPU-bound pieces in separate processes in a way that’s ergonomic and cross-platform.
On Windows it’s particularly painful because we’re stuck with the spawn start method, so even if you wrap the process in async, the fundamental costs and constraints don’t go away:
process creation and inter-process communication overhead,
arguments and callables need to be picklable (so lots of lambdas/closures don’t work),
the whole if __name__ == "__main__": requirement,
functions needing to live at module top level so they can be imported by the child process.
That’s the part I find “hard”: not the async layer itself, but the combination of process semantics + Windows + pickling and IPC.
My question is less about how to await a subprocess nicely, and more about whether there’s a good pattern or library for building a taskflow/executor that allocates resources to different task types and runs smoothly on multiple operating systems.
If you’ve found any approaches that deal well with that side of things, I’d really love to hear about them.
I must confess I’m skeptical about the need for Gatling and don’t understand the point of it, let along the value it adds to writing my code according to its paradigm. I just wanted to make that specific suggestion, based on a recent positive experience.
building a taskflow/executor that allocates resources to different task types and runs smoothly on multiple operating systems.
Is the user’s client in a main Python process ever in charge of those compute resources? Normally, Python’s got to request them from the OS, or some other service doesn’t it?
On Windows it’s particularly painful
My 4GB Windows laptop works great… …for spinning up cloud Linux servers.
When I need loads of different processes, I like to set them all out in a Docker Compose file, and let the host OS and container engine figure out resource scheduling. If I ever need to launch new processes dynamically, I’ll stop merely reading about Kubernetes and actually start using it.
I understand that tools like Docker or Kubernetes are powerful for resource management and distributed workloads. You could even use Kafka to handle high-volume queues and implement a publish–subscribe model. But for many common Python tasks, using them is overkill and overly complicated.
What I want is a lightweight but powerful framework that lets you fully utilize local resources with just a few lines of code. For example:
Line 1: define an I/O-bound task (like a network request).
Line 2: define a CPU-bound task (like a heavy computation).
Line 3: define a disk-bound task (like writing to a file).
The framework would then automatically decide how to execute each task — using async for I/O, multiple cores for CPU-heavy work, and threads for disk operations — all without the developer worrying about low-level configuration.
More importantly, this framework focuses on defining what needs to be done and what resources should be used, not how to do it. I don’t want to describe the execution process in detail or optimize every layer manually. That just makes things too complex. Instead, I want to express these tasks in the simplest way possible and let the framework complete them as fast as possible with minimal resource usage.
Right now, I haven’t seen a framework that really does this. Of course, you could achieve something similar with Docker, Kubernetes, or even microservices, but that’s far too heavy for small or local Python workloads.
Your ambition is fantastic! Have you thought about making Gatling get the most out of the GIL free, and free threading Python builds?
Async and threading bugs can be particularly complicated So if the tasks must all be in the same application (i.e. we can’t refactor anything more into separate Docker / K8S services) then personally I prefer to use the native features of the language, and decide what runs what, myself. Task execution is a first class concern for me.
Thanks! I’ll support the GIL-free builds later on, but it’s not a top priority right now. I’m currently focusing more on optimizing database and Redis connections, and some multi-GPU scheduling work. The project’s now updated to version 0.3.0.0, which already supports multiprocessing through the “multiprocess” and “dill” libraries.