Add task pipeline to asyncio with capped parallelism and lazy input reading

Thanks for raising the topic! I agree, applying async function to every item of async iterator is quite common problem.

Error handling is a strong challenge for the proposed design.
The if error is not None... check for each iterator element is tiresome; Python usually raises exceptions instead of returning them.
Yes, I know about return_exceptions argument for gather() but I still think that it is a little unnatural.
In itertools, if a predicate fails the whole function raises an exception. I personally prefer this approach, a user could use try/except in a callable on its own if needed.

Speaking of await gather(tasks, return_exceptions=True) I would say that in my real code I always need to know what task was failed even for meaningful logging; more complicated error handling requires more context than just exception value.

There are other things to consider: should the api work with sync callables and iterators, should it keep the order or support reordering for faster execution, some implementation notes; but all these questions could be discussed after making decision about error handling.