Asyncio manage dynamic list of task

I have a main class which creates an instance from a list of plugins. In the example below, the ‘dhcpv6’ plugin is launched and it listens for subscriptions on the 'subject_test_subscribe" subject. As soon as the handler intercepts a new message on this subject, it creates an new asynchronous task by instantiating the classe “Dhcpv6_child().run()” which represents a child of the class Dhcpv6.py. Each child have a timeout to kill the process child. For information, the class plugin definied in init .py is an abstract class which allows to load plugins.

I’m not able to dynamically manage the addition of new tasks with asyncio.gather (until the current task is finished, the new task is not executed). The problem is solved with asyncio.create_subprocess_exec but I’d prefer to use the possibilities of asyncio

main.py

import argparse
import asyncio
import importlib

parser = argparse.ArgumentParser()
parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS")
parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser")
args = parser.parse_args()

print("************** main() *****************")
print(f"NATS server IP: {args.nats}")
print(f"List of plugins to load: {args.plugins}")

async def main():
    tasks = []
    for type_plugin in args.plugins:
        try:
            module = importlib.import_module(f'Plugins.{type_plugin}')
            my_class = getattr(module, type_plugin)
            my_instance = my_class(nats=args.nats)
            tasks.append(asyncio.create_task(my_instance.run()))
        except Exception as e:
            print("Erreur chargement du plugin : ", type_plugin, ":",  e)

    try:
        await asyncio.gather(*tasks)
    except asyncio.TimeoutError:
        print("timeout main")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.create_task(main())
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(e)
    finally:
        loop.close()

Dhcpv6.py

import asyncio
import json
import logging
import sys

from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from Plugins import Plugin
from Plugins.Dhcpv6_child import Dhcpv6_child

logger = logging.getLogger("dhcpv6")
logging.basicConfig(level=logging.DEBUG)


class Dhcpv6(Plugin):
    name = "Dhcpv6 plugin"
    subject = "subject_test_subscribe"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.nats = kwargs.get('nats')
        self.list_task = []
        self.timeout_task = 5
        print("Class dhcpv6 ==> constructor /  nats_ip : {}".format(self.nats))


    async def run(self):
        print("******* run DHCPV6 ***********")
        nc = NATS()
        try:
            await nc.connect("127.0.0.1", verbose=True, pedantic=True)
        except ErrConnectionClosed:
            print(f"La connexion a ete fermee inopinement")
            return
        except ErrTimeout:
            print(f"Le delai imparti a la connexion est depasse")
            return
        except ErrNoServers:
            print(f"Aucun serveur n'a repondu a temps")
            return
        except Exception as e:
            print(f"Exception inattendue: {e}")
            return

        async def plugin_handler(msg):
            print(msg)
            try:
                # Creating and running a new task on demand
                self.list_task.append(asyncio.wait_for(
                    asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task))
                print("append a new task : ", self.list_task)
            except:
                print("Error append new task")

            try:
                print("Running a new task : ", self.list_task)
                await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0)
            except asyncio.TimeoutError:
                print("timeout")


        print(f"Subscribing test on : {self.subject}")
        await nc.subscribe(f"{self.subject}", cb=plugin_handler)

        while nc.is_connected:
            await asyncio.sleep(0.5)
        await nc.drain()

Output screen example

************** main() *****************
NATS server IP: 127.0.0.1
List of plugins to load: : ['Dhcpv6']


------------ Load a list of plugins : {Dhcpv6} -------
Class plugin ==> constructor
Class dhcpv6 ==> constructor /  nats_ip : 127.0.0.1

******* run DHCPV6 ***********
INFO:Plugin:Plugin Dhcpv6 plugin loaded
Subscribing test on : subject_test_subscribe
<Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'>

Class CHILD ==> constructor
append a new task :  [<coroutine object wait_for at 0x0000026D0310F040>]
Runnig a new task :  [<coroutine object wait_for at 0x0000026D0310F040>] ```

you could use anyio.create_task_group():

import anyio


class Dhcpv6(Plugin):
    async def run(self):
        print("******* run DHCPV6 ***********")
        nc = NATS()
        try:
            await nc.connect("127.0.0.1", verbose=True, pedantic=True)
        except ErrConnectionClosed:
            print(f"La connexion a ete fermee inopinement")
            return
        except ErrTimeout:
            print(f"Le delai imparti a la connexion est depasse")
            return
        except ErrNoServers:
            print(f"Aucun serveur n'a repondu a temps")
            return
        except Exception as e:
            print(f"Exception inattendue: {e}")
            return

        async def plugin_handler(msg):
            print(msg)

            async def with_timeout(async_fn):
                with anyio.move_on_after(self.timeout_task):
                    return await async_fn()

            tg.start_soon(with_timeout, Dhcpv6_child().run)

        with anyio.move_on_after(3600):
            async with anyio.create_task_group() as tg:
                print(f"Subscribing test on : {self.subject}")
                await nc.subscribe(f"{self.subject}", cb=plugin_handler)

                while ns.is_connected:
                    await anyio.sleep(0.5)

                tg.cancel_scope.cancel()

        await nc.drain()