Following seems to work (without having tested extensively):
- Keep all action callbacks of
cmd.Cmd
sync - Wrap every action callback by
asyncio.run(run())
, whererun()
is theasync
API function asyncio.run()
will be synchronously blocking, till async function completed.
A bit repetitive, as you need to do this for each action callback. That’s at least as good as I got it. Full example code:
import cmd
import asyncio
class MyShell(cmd.Cmd):
def do_myaction(self, arg):
async def run():
await asyncio.sleep(3)
print("Some app results")
asyncio.run(run())
async def main():
await asyncio.gather(shell(), other_task())
print("Exiting...")
async def shell():
def run():
MyShell().cmdloop()
# Run in separate thread to make interactive shell I/IO non-blocking
await asyncio.to_thread(run)
async def other_task():
# represents some concurrent initialization task
await asyncio.sleep(1)
print("Concurrent init task")
asyncio.run(main())