I have been working with async iterators a lot recently and one aspect I have found awkward is handling exceptions which are raised in the iteration. For context my use case (fairly common I imagine) is using async iterators over IO streams, eg MQTT or Kafka messages. These systems can potentially throw errors when you receive a message, and sometimes you want to handle the exception and continue iterating. I would like to use the async for iterating over such async streams but it comes with the issue that it is cumbersome to catch an error in the iteration, then continue iterating. I would like to propose a new feature try async for (and for consistency try for to address this)
To illustrate my example imagine we have a mock IO function and I create a class to iterate over some fetched results:
import asyncio
import random
class IOError(Exception):
pass
async def io(x):
if random.random() > 0.5:
raise IOError
else:
return x
class MyIterator:
def __init__(self):
self.n = 0
async def __anext__(self):
try:
result = await io(self.n)
finally:
self.n += 1
if self.n > 10:
raise StopAsyncIteration
return result
def __aiter__(self):
return self
So the issue is what if the IOError happens, but I still want to continue the iteration?
One option is to abandon the for loop and use a while loop:
my_iter = MyIterator()
while True:
try:
result = await my_iter.__anext__()
print(result)
except IOError:
pass
except StopAsyncIteration:
break
This works but itâs not very Pythonic, and removes the point of async for. Another option is to catch the exception externally then continue iterating.
my_iter = MyIterator()
while True:
try:
async for res in my_iter:
print(res)
else:
break
except IOError:
pass
This to me seems difficult to read and is nested excessively. To solve these problems we might consider changing the iterator to return a sentinel value instead of raising an exception:
class MyIterator:
def __init__(self):
self.n = 0
async def __anext__(self):
try:
result = await io(self.n)
except IOError:
return None
finally:
self.n += 1
if self.n > 10:
raise StopAsyncIteration
return result
def __aiter__(self):
return self
So we can iterate through like so:
async for result in MyIterator():
if result is not None:
print(result)
However this is also not very Pythonic, as we are meant to use exceptions, not sentinel values to show exceptional conditions.
To solve this I would propose a simple syntax extension, allow try before async for or for loops. So referring back to the first class which raises the exception in the __anext__ method my example would look like:
try async for result in MyIterator():
print(result)
except:
pass
The semantics here are if the __anext__ method raises an exception (aside from StopIteration of course), it triggers the except branch, and continues with the iteration. This to me seems much cleaner, and is fairly intuitive to grasp how this syntax works. finally would work in a similar fashion:
try async for result in MyIterator():
print(result)
except:
pass
finally:
print("Always prints!")
What do you think about this new syntax addition? Grateful for any feedback.