Trace a line after the line but not only before the line

sys.settrace and Bdb.trace_dispatch can trace events. But the "line" event is only triggered before the execution of the line.

I’m trying to get information exactly after the line being executed, but not before the next line’s execution. Now I can record the lines and info like filenames to mimic after the line behaviour by actually tracing before the next line. Using ast to change codes may also work. But there’re too many burdens especially when multiple modules exist, and the last line can not be traced by using the next line strategy (adding an additional pass to the end works in some cases).

Is it possible to add an after_line event? Or there may be an elegant way that I’ve not known to deal with the case.

The next line strategy works, and the last line can be handled with the 'return' event instead.

result = 0
filename = "temp.txt"
with open(filename, "r") as f:
    file_content = f.read()
result = 1
def get_1():
    return 1
x = get_1()
result = 2

pass

The behaviour of “the next line” strategy becomes weird when a function call is involved. I got this when tracing variables’ changes in the test.py file above:

After executing test.py line 1's code <module>: result = 0, occurred this change:
  result = 0

After executing test.py line 2's code <module>: filename = "temp.txt", occurred this change:
  filename = temp.txt

After executing <frozen codecs> line 325's code decode: , occurred this change:
  file_content = in [temp.txt]: test content to be written

After executing test.py line 5's code <module>: result = 1, occurred this change:
  result = 1

After executing test.py line 9's code <module>: result = 2, occurred this change:
  result = 2

After executing test.py line 9's code <module>: result = 2, occurred this change:
  x = 1

The change log for result suggests that my variable tracing is “right” (for names, linenos, etc.). But the timing of the annotations when there is a function call appears to be off.

It can be CPython’s inner optimization? I can see x and result’s logging order changes when tracing multiple times.

Can you post your tracer code for us to reproduce the issue?

1 Like

Sure, let me explain: VariableTracker is used to find variables to be traced, manually assign some is fine. LinearVisitor is for jumping loops, trace_func only trace a var when it’s first defined or is different from the same name logged in a dict logger.

import ast
from collections import defaultdict
import sys
import os
import importlib.util
import linecache
import types

class VariableTracker(ast.NodeVisitor):
    def __init__(self):
        self.all_vars = set()
        self.vars_at_line = defaultdict(list)
        self.lineno_of_var = defaultdict(lambda: float('inf'))
        super().__init__()
    def add_target_vars(self, target, lineno=None):
        if isinstance(target, ast.Name):  # actually a str
            # print(target, type(target), target.id, isinstance(target.id, str))
            if target.id not in self.all_vars:
                self.vars_at_line[lineno].append(target.id)
                self.lineno_of_var[target.id] = lineno
            self.all_vars.add(target.id)
        elif isinstance(target, ast.arg):  # actually a str
            # print(target, type(target), target.arg, isinstance(target.arg, str))
            if target.arg not in self.all_vars and target.arg not in ['self', 'cls']:
                self.vars_at_line[lineno].append(target.arg)
                self.lineno_of_var[target.arg] = lineno
            self.all_vars.add(target.arg)
        elif isinstance(target, ast.Tuple) or isinstance(target, ast.List):
            for elt in target.elts:
                self.add_target_vars(elt, lineno)
        elif isinstance(target, str):
            if target not in self.all_vars:
                self.vars_at_line[lineno].append(target)
                self.lineno_of_var[target] = lineno
            self.all_vars.add(target)
    def visit_NamedExpr(self, node):  # ":=" in if, while, match, ...
        self.add_target_vars(node.target, node.lineno)
        self.generic_visit(node)
    def visit_Assign(self, node):
        for target in node.targets:
            self.add_target_vars(target, node.lineno)
        self.generic_visit(node)
    def visit_AnnAssign(self, node):
        for target in node.target:
            self.add_target_vars(target, node.lineno)
        self.generic_visit(node)
    def visit_For(self, node):
        self.add_target_vars(node.target, node.lineno)
        self.generic_visit(node)
    def visit_AsyncFor(self, node):
        self.add_target_vars(node.target, node.lineno)
        self.generic_visit(node)
    def visit_FunctionDef(self, node):
        # function name not considered as a variable
        # self.add_target_vars(node.name, node.lineno)
        for arg in node.args.args:
            self.add_target_vars(arg, node.lineno)
        self.generic_visit(node)
    def visit_AsyncFunctionDef(self, node):
        self.visit_FunctionDef(node)
    def visit_ClassDef(self, node):
        self.add_target_vars(node.name, node.lineno)
        self.generic_visit(node)


class LinearVisitor(ast.NodeVisitor):
    def __init__(self):
        self.jump_linenos = []
        return super().__init__()

    def visit_For(self, node):
        self.jump_linenos.append((node.lineno, node.end_lineno))
        self.generic_visit(node)

    def visit_While(self, node):
        self.jump_linenos.append((node.lineno, node.end_lineno))
        self.generic_visit(node)

    def get_jump_linenos(self):
        return self.jump_linenos


def monitor_script(code, script_path):
    # only to get the variables to be traced, manually set them is ok.
    tree = ast.parse(code)
    vars_tracker = VariableTracker()
    vars_tracker.visit(tree)
    lineno_of_var = vars_tracker.lineno_of_var
    to_trace_vars = vars_tracker.all_vars

    linear_visitor = LinearVisitor()
    linear_visitor.visit(tree)
    jump_linenos = linear_visitor.get_jump_linenos()

    variables_logger = {}
    variables_history = defaultdict(dict)

    # find script module
    spec = importlib.util.spec_from_file_location("monitored_module", script_path)
    module = importlib.util.module_from_spec(spec)

    # cache last line's info
    last_line = [0]
    last_filename = [script_path]
    last_co_name = [None]

    def trace_func(frame, event, arg):
        if event == "line":
            # get basic info
            lineno = frame.f_lineno
            filename = frame.f_code.co_filename
            co_name = frame.f_code.co_name
            last_code_line = linecache.getline(filename, last_line[0]).strip()
            # loops are too complex, skip them
            for s, e in jump_linenos:
                if s <= lineno <= e:
                    last_line[0] = lineno
                    last_filename[0] = filename
                    last_co_name[0] = co_name
                    return trace_func

            # only trace the script
            if os.path.basename(last_filename[0]) == os.path.basename(script_path):
                # only local variables
                local_vars = frame.f_locals

                # log and print
                for var_name in to_trace_vars:
                    if (
                        var_name in local_vars
                        and lineno_of_var[var_name] <= last_line[0]
                    ):
                        if var_name not in variables_logger:
                            # When a var is first defined
                            variables_logger[var_name] = local_vars[var_name]
                            variables_history[last_line[0]][var_name] = local_vars[
                                var_name
                            ]
                            print(
                                f"\nAfter executing {last_filename[0].split('/')[-1]} line {last_line[0]}'s code {last_co_name[0]}: {last_code_line}, occurred this change:"
                            )
                            print(f"  {var_name} = {local_vars[var_name]}")
                        elif variables_logger[var_name] != local_vars[var_name]:
                            # When a var is changed
                            variables_logger[var_name] = local_vars[var_name]
                            variables_history[last_line[0]][var_name] = local_vars[
                                var_name
                            ]
                            print(
                                f"\nAfter executing {last_filename[0].split('/')[-1]} line {last_line[0]}'s code {last_co_name[0]}: {last_code_line}, occurred this change:"
                            )
                            print(f"  {var_name} = {local_vars[var_name]}")
            last_line[0] = lineno
            last_filename[0] = filename
            last_co_name[0] = co_name
        return trace_func

    sys.settrace(trace_func)

    try:
        print(f"Begin {script_path}...")
        spec.loader.exec_module(module)
        print(f"\n{script_path} Done")
    finally:
        sys.settrace(None)

if __name__ == "__main__":
    script_to_monitor = "test.py"
    with open(script_to_monitor, "r") as f:
        code = f.read()
    monitor_script(code, script_to_monitor)

Additionally but not directly related, when executing a dynamic code string, we often use compile and exec. But when we want to find & load a module, this doesn’t obey the finder & loader protocol of PEP 302 and PEP 420. And we have to implement our own Finder and Loader. For the purpose of dynamically running a module which is relatively separated from current environment, this might be too complex. We can also use spec_from_file_location after storing the code locally, but there is an IO overhead.

Maybe we can add importlib.util a spec_from_code_str function similar to spec_from_file_location, so we can dynamically find & load a code string without defining a new meta hook.

The drawback concerns safety: exec_module from string is powerful but unsafe.

With your tracer code posted it is clear that the arbitrarily changing order of x and result’s logging order is simply due to your use of a set, an unordered container, to keep track of the variables.

You should also use a stack to keep track of the last line numbers, push the line number to the stack for a 'call' event and pop the line number for a 'return' event so that the top of the stack can point to the actual last line in the current scope rather than the last line executed, which may be in a returning function call.

Thank you! Let me try to summarize my mistakes:

  1. set is not like dict which maintains the insertion order since 3.7.
  2. A 'line' event is triggered before the execution, and it will not override other events that occurred in the 'line', so 'call', 'return', etc. should be considered.

Hmm your summary doesn’t seem to include my main point, which is actually to use a stack to return to the previous state after a call.

Here’s an example:

import sys
import linecache

def get_tracer():
    def tracer(frame, event, arg):
        if event == 'call':
            lineno_locals.append(())
        elif event in ('line', 'return'):
            if last_lineno_locals := lineno_locals[-1]:
                last_lineno, last_locals = last_lineno_locals
                for name, _ in set.difference(*(
                    {(name, id(obj)) for name, obj in ns.items()}
                    for ns in (frame.f_locals, last_locals)
                )):
                    print(
                        f"After line {last_lineno}:",
                        linecache.getline(frame.f_code.co_filename, last_lineno),
                        f' {name} = {frame.f_locals[name]}'
                    )
            if event == 'return':
                lineno_locals.pop()
            else:
                lineno_locals[-1] = frame.f_lineno, frame.f_locals.copy()
        return tracer
    lineno_locals = [()]
    return tracer

so that:

sys._getframe().f_trace = tracer = get_tracer()
sys.settrace(tracer)

result = 1
def get_1():
    a = 0
    return a
result = 2
x = get_1()
result = 3

outputs something like:

After line 31: result = 1
  result = 1
After line 32: def get_1():
  get_1 = <function get_1 at 0x7a4e6f2d54e0>
After line 35: result = 2
  result = 2
After line 33:     a = 0
  a = 0
After line 36: x = get_1()
  x = 0
After line 37: result = 3
  result = 3

Demo here

1 Like

Thanks for the clarification & the example! I actually did adopt a stack for solving mistake 2, just didn’t explicitly mention it.:see_no_evil_monkey:
Thanks again. And your code, especially the way you organize it, is super helpful!

1 Like