Async example
import asyncio
class AsyncPipe:
def __init__(self, value):
self.value = value
self.tasks = []
async def run(self):
value = self.value
for i, (t, fut) in enumerate(self.tasks):
try:
value = await t(value)
fut.set_result(value)
except Exception as e:
fut.set_exception(e)
for _, fut_cancel in self.tasks[i + 1:]:
fut_cancel.cancel()
raise
return value
def __pipe__(self, rhs, rhs_noinject, last):
p = rhs_noinject(None)
fut = asyncio.Future()
self.tasks.append((p, fut))
return (self, asyncio.create_task(self.run())) if last else (self, fut)
def async_partial(func, *args, **kwargs):
async def inner(*inner_args, **inner_kwargs):
await asyncio.sleep(1)
return func(*args, *inner_args, **kwargs, **inner_kwargs)
return inner
async def async_list(x):
await asyncio.sleep(1)
return list(x)
x = await (
AsyncPipe("lorem ipsum dolor sit amet")
|> (_1 := async_partial(str.split, maxsplit=2))
|> (_2 := async_partial(map, str.capitalize))
|> (_3 := async_list)
)
>>> x
['Lorem', 'Ipsum', 'Dolor sit amet']
>>> _
<PyodideTask finished name='Task-217' coro=<AsyncPipe.run() done, defined at <console>:5> result=['Lorem', 'Ipsum', 'Dolor sit amet']>
>>> _1
<Future finished result=['lorem', 'ipsum', 'dolor sit amet']>
>>> _2
<Future finished result=<map object at 0xd9a010>>
>>> _3
<PyodideTask finished name='Task-217' coro=<AsyncPipe.run() done, defined at <console>:5> result=['Lorem', 'Ipsum', 'Dolor sit amet']>
Clearly these examples will not be fundamentally different than when overriding any other operator. However, the added value could indeed be in last
and with the newly-added support for the walrus operator along with __pipe__()
- this allows to neatly capture all the intermediate async results. @dg-pb what do you think? Currently, this definitely cannot be done as cleanly with just the operator overloading.
I am getting back to PEP writing.