Add ElementTree.iterparse.close; remove function and class creation on call

I created 3 new ET.iterparse implementation that does not create a new function and a new class on each call and has close method (issue).

A quick description of all of them:

  • the 1st is absolutely compatible, but has slower reading than current.
  • the 2nd is compatible too, but in next cases file will be closed only at garbage collection, not on exception catching:
try:
    for event, el in iterparse2(file):
        ...
except Exception: pass
# go on

It can be fixed via adding __enter__ and __exit__, but this will result in an additional indent.

with iterparse2(file) as it:
    for ev, el in it:
        pass

This implementation has faster read time than the 1st one, but still slower than current one.

  • the 3rd implementation is like the 1st, but does not define __next__ and returns inner iterator in __iter__. Such change allows faster reading time, but is not compatible (current implementation is an iterator).
  • All implementations have faster creation time, but slower read time.
  • Each implementation closes source file on object deletion.
code
from timeit import repeat
from xml.etree.ElementTree import XMLPullParser, iterparse as iterparse_old


class iterparse1:
    """
    __slots__ = '_source', '_source_opened', '_parser', 'root', '_it'

    def __init__(self, /, source, events = None, parser = None):
        # source_opened flag must be set before opening source to avoid errors in __del__
        # if source cannot be opened, an error is emitted and object is moved to gc
        # gc calls __del__ which calls close()
        self._source_opened = False

        if not hasattr(source, 'read'):
            self._source = open(source, 'rb')
            self._source_opened = True
        else:
            self._source = source

        self._parser = XMLPullParser(events=events, _parser=parser)
        self.root = None
        self._it = self._iterator()

    def __iter__(self, /):
        return self

    def _iterator(self, /):
        source = self._source
        parser = self._parser
        try:
            data = source.read(16 * 1024)
            while data:
                parser.feed(data)
                yield from parser.read_events()
                data = source.read(16 * 1024)

            root = parser._close_and_return_root()
            yield from parser.read_events()  # is it necessary?
            self.root = root
        finally:
            self.close()

    def __next__(self, /):
        return next(self._it)

    def close(self, /):
        if self._source_opened:
            self._source.close()
            self._source_opened = False

    def __del__(self, /):
        self.close()


class iterparse2:
    __slots__ = '_source', '_source_opened', '_parser', '_root', 'root', '_it', '_exhausted'

    _empty_tuple = ()

    def __init__(self, /, source, events = None, parser = None):
        # source_opened flag must be set before opening source to avoid errors in __del__
        # if source cannot be opened, an error is emitted and object is moved to gc
        # gc calls __del__ which calls close()
        self._source_opened = False

        if not hasattr(source, 'read'):
            self._source = open(source, 'rb')
            self._source_opened = True
        else:
            self._source = source

        self._parser = XMLPullParser(events=events, _parser=parser)
        self._root = self.root = None
        self._it = iter(self._empty_tuple)
        self._exhausted = False

    def __iter__(self, /):
        return self

    def __next__(self, /):
        t = next(self._it, None)
        while t is None:
            if self._exhausted:
                self.root = self._root
                self.close()
                raise StopIteration

            data = self._source.read(16 * 1024)
            if data:
                self._parser.feed(data)
                self._it = self._parser.read_events()
            else:
                self._root = self._parser._close_and_return_root()
                # are there any events after source exhausted?
                self._it = self._parser.read_events()
                self._exhausted = True

            t = next(self._it, None)

        return t

    def close(self, /):
        if self._source_opened:
            self._source.close()
            self._source_opened = False

    def __del__(self, /):
        self.close()


class iterparse3:
    __slots__ = '_source', '_source_opened', '_parser', 'root', '_it'

    def __init__(self, /, source, events = None, parser = None):
        # source_opened flag must be set before opening source to avoid errors in __del__
        # if source cannot be opened, an error is emitted and object is moved to gc
        # gc calls __del__ which calls close()
        self._source_opened = False

        if not hasattr(source, 'read'):
            self._source = open(source, 'rb')
            self._source_opened = True
        else:
            self._source = source

        self._parser = XMLPullParser(events=events, _parser=parser)
        self.root = None
        self._it = self._iterator()

    def _iterator(self, /):
        source = self._source
        parser = self._parser
        try:
            data = source.read(16 * 1024)
            while data:
                parser.feed(data)
                yield from parser.read_events()
                data = source.read(16 * 1024)

            root = parser._close_and_return_root()
            yield from parser.read_events()  # is it necessary?
            self.root = root
        finally:
            self.close()

    def __iter__(self, /):
        return self._it

    # def __next__(self, /):
    #     return next(self._it)

    def close(self, /):
        if self._source_opened:
            self._source.close()
            self._source_opened = False

    def __del__(self, /):
        self.close()


def test_creation(file: str, impls: list[type], /):
    code = f'iterparse({file!r})'

    return tuple(
        repeat(
            code,
            repeat=5,
            number=100,
            globals=dict(iterparse=t),
            )
        for t in impls
        )


def test_reading(file: str, impls: list[type], /):
    code = f'for _ in iterparse({file!r}): pass'

    return tuple(
        repeat(
            code,
            repeat=5,
            number=10,
            globals=dict(iterparse=t),
            )
        for t in impls
        )


def main():
    file_path = '20220425-FULL-1_1(xsd).xml'
    impls = [iterparse1, iterparse2, iterparse3, iterparse_old]

    creation = test_creation(file_path, impls)
    for i, time in enumerate(creation):
        print(f'Creation time of {impls[i].__name__}:', min(time))

    reading = test_reading(file_path, impls)
    for i, time in enumerate(reading):
        print(f'Reading time of {impls[i].__name__}:', min(time))


if __name__ == '__main__':
    main()

Test file can be grabbed here.

I did 4th and in my opinion the most successful implementation. It is fully compatible and should be more optimized on time and memory.

4th
class iterparse4:
    __slots__ = '_source', '_source_opened', 'root', '_it'

    def __init__(self, /, source, events = None, parser = None):
        # source_opened flag must be set before opening source to avoid errors in __del__
        # if source cannot be opened, an error is emitted and object is moved to gc
        # gc calls __del__ which calls close()
        self._source_opened = False

        if hasattr(source, 'read'):
            self._source = source
        else:
            self._source = open(source, 'rb')
            self._source_opened = True

        self.root = None
        self._it = self._iterator(XMLPullParser(events=events, _parser=parser))

    def __iter__(self, /):
        return self

    def _iterator(self, parser: XMLPullParser, /):
        source = self._source
        try:
            data = source.read(16 * 1024)
            while data:
                parser.feed(data)
                yield from parser.read_events()
                data = source.read(16 * 1024)

            root = parser._close_and_return_root()
            yield from parser.read_events()  # is it necessary?
            self.root = root
        finally:
            self.close()

    def __next__(self, /):
        return self._it.__next__()

    def close(self, /):
        if self._source_opened:
            self._source.close()
            self._source_opened = False

    def __del__(self, /):
        self.close()

I also updated testing. Looks like traverse time actually is the same in all implementations (current included), but creation time is definitely increased.

testing
def test_creation(file: str, impls: list[type], /):
    code = f'iterparse({file!r})'

    return tuple(
        repeat(
            code,
            repeat=50,
            number=1000,
            globals=dict(iterparse=t),
            )
        for t in impls
        )


def test_traverse(file: str, impls: list[type], /):
    setup = f'it = iterparse({file!r})'
    code = f'for _ in it: pass'

    return tuple(
        repeat(
            code,
            setup=setup,
            repeat=50,
            number=10,
            globals=dict(iterparse=t),
            )
        for t in impls
        )


def main():
    file_path = '20220609-FULL-1_1(xsd).xml'
    impls = [iterparse1, iterparse2, iterparse3, iterparse4, iterparse_old]

    creation = test_creation(file_path, impls)
    for i, time in enumerate(creation):
        print(f'Creation time of {impls[i].__name__}:', min(time))

    print()

    traverse = test_traverse(file_path, impls)
    for i, time in enumerate(traverse):
        print(f'Traverse time of {impls[i].__name__}:', min(time))

I just wanted to point out that I had no idea what are you talking about. Just after you sent the second post here I googled and realized that it is: xml.etree.ElementTree — The ElementTree XML API.

Using just the two letter abbreviation ET makes your post very hard to find and recognize by a quick glance.

1 Like

I agree and fixed the title.

1 Like