I created 3 new ET.iterparse
implementation that does not create a new function and a new class on each call and has close
method (issue).
A quick description of all of them:
- the 1st is absolutely compatible, but has slower reading than current.
- the 2nd is compatible too, but in next cases
file
will be closed only at garbage collection, not on exception catching:
try:
for event, el in iterparse2(file):
...
except Exception: pass
# go on
It can be fixed via adding __enter__
and __exit__
, but this will result in an additional indent.
with iterparse2(file) as it:
for ev, el in it:
pass
This implementation has faster read time than the 1st one, but still slower than current one.
- the 3rd implementation is like the 1st, but does not define
__next__
and returns inner iterator in__iter__
. Such change allows faster reading time, but is not compatible (current implementation is an iterator). - All implementations have faster creation time, but slower read time.
- Each implementation closes source file on object deletion.
code
from timeit import repeat
from xml.etree.ElementTree import XMLPullParser, iterparse as iterparse_old
class iterparse1:
"""
__slots__ = '_source', '_source_opened', '_parser', 'root', '_it'
def __init__(self, /, source, events = None, parser = None):
# source_opened flag must be set before opening source to avoid errors in __del__
# if source cannot be opened, an error is emitted and object is moved to gc
# gc calls __del__ which calls close()
self._source_opened = False
if not hasattr(source, 'read'):
self._source = open(source, 'rb')
self._source_opened = True
else:
self._source = source
self._parser = XMLPullParser(events=events, _parser=parser)
self.root = None
self._it = self._iterator()
def __iter__(self, /):
return self
def _iterator(self, /):
source = self._source
parser = self._parser
try:
data = source.read(16 * 1024)
while data:
parser.feed(data)
yield from parser.read_events()
data = source.read(16 * 1024)
root = parser._close_and_return_root()
yield from parser.read_events() # is it necessary?
self.root = root
finally:
self.close()
def __next__(self, /):
return next(self._it)
def close(self, /):
if self._source_opened:
self._source.close()
self._source_opened = False
def __del__(self, /):
self.close()
class iterparse2:
__slots__ = '_source', '_source_opened', '_parser', '_root', 'root', '_it', '_exhausted'
_empty_tuple = ()
def __init__(self, /, source, events = None, parser = None):
# source_opened flag must be set before opening source to avoid errors in __del__
# if source cannot be opened, an error is emitted and object is moved to gc
# gc calls __del__ which calls close()
self._source_opened = False
if not hasattr(source, 'read'):
self._source = open(source, 'rb')
self._source_opened = True
else:
self._source = source
self._parser = XMLPullParser(events=events, _parser=parser)
self._root = self.root = None
self._it = iter(self._empty_tuple)
self._exhausted = False
def __iter__(self, /):
return self
def __next__(self, /):
t = next(self._it, None)
while t is None:
if self._exhausted:
self.root = self._root
self.close()
raise StopIteration
data = self._source.read(16 * 1024)
if data:
self._parser.feed(data)
self._it = self._parser.read_events()
else:
self._root = self._parser._close_and_return_root()
# are there any events after source exhausted?
self._it = self._parser.read_events()
self._exhausted = True
t = next(self._it, None)
return t
def close(self, /):
if self._source_opened:
self._source.close()
self._source_opened = False
def __del__(self, /):
self.close()
class iterparse3:
__slots__ = '_source', '_source_opened', '_parser', 'root', '_it'
def __init__(self, /, source, events = None, parser = None):
# source_opened flag must be set before opening source to avoid errors in __del__
# if source cannot be opened, an error is emitted and object is moved to gc
# gc calls __del__ which calls close()
self._source_opened = False
if not hasattr(source, 'read'):
self._source = open(source, 'rb')
self._source_opened = True
else:
self._source = source
self._parser = XMLPullParser(events=events, _parser=parser)
self.root = None
self._it = self._iterator()
def _iterator(self, /):
source = self._source
parser = self._parser
try:
data = source.read(16 * 1024)
while data:
parser.feed(data)
yield from parser.read_events()
data = source.read(16 * 1024)
root = parser._close_and_return_root()
yield from parser.read_events() # is it necessary?
self.root = root
finally:
self.close()
def __iter__(self, /):
return self._it
# def __next__(self, /):
# return next(self._it)
def close(self, /):
if self._source_opened:
self._source.close()
self._source_opened = False
def __del__(self, /):
self.close()
def test_creation(file: str, impls: list[type], /):
code = f'iterparse({file!r})'
return tuple(
repeat(
code,
repeat=5,
number=100,
globals=dict(iterparse=t),
)
for t in impls
)
def test_reading(file: str, impls: list[type], /):
code = f'for _ in iterparse({file!r}): pass'
return tuple(
repeat(
code,
repeat=5,
number=10,
globals=dict(iterparse=t),
)
for t in impls
)
def main():
file_path = '20220425-FULL-1_1(xsd).xml'
impls = [iterparse1, iterparse2, iterparse3, iterparse_old]
creation = test_creation(file_path, impls)
for i, time in enumerate(creation):
print(f'Creation time of {impls[i].__name__}:', min(time))
reading = test_reading(file_path, impls)
for i, time in enumerate(reading):
print(f'Reading time of {impls[i].__name__}:', min(time))
if __name__ == '__main__':
main()
Test file can be grabbed here.