Sure, let me explain: VariableTracker
is used to find variables to be traced, manually assign some is fine. LinearVisitor
is for jumping loops, trace_func
only trace a var when it’s first defined or is different from the same name logged in a dict logger.
import ast
from collections import defaultdict
import sys
import os
import importlib.util
import linecache
import types
class VariableTracker(ast.NodeVisitor):
def __init__(self):
self.all_vars = set()
self.vars_at_line = defaultdict(list)
self.lineno_of_var = defaultdict(lambda: float('inf'))
super().__init__()
def add_target_vars(self, target, lineno=None):
if isinstance(target, ast.Name): # actually a str
# print(target, type(target), target.id, isinstance(target.id, str))
if target.id not in self.all_vars:
self.vars_at_line[lineno].append(target.id)
self.lineno_of_var[target.id] = lineno
self.all_vars.add(target.id)
elif isinstance(target, ast.arg): # actually a str
# print(target, type(target), target.arg, isinstance(target.arg, str))
if target.arg not in self.all_vars and target.arg not in ['self', 'cls']:
self.vars_at_line[lineno].append(target.arg)
self.lineno_of_var[target.arg] = lineno
self.all_vars.add(target.arg)
elif isinstance(target, ast.Tuple) or isinstance(target, ast.List):
for elt in target.elts:
self.add_target_vars(elt, lineno)
elif isinstance(target, str):
if target not in self.all_vars:
self.vars_at_line[lineno].append(target)
self.lineno_of_var[target] = lineno
self.all_vars.add(target)
def visit_NamedExpr(self, node): # ":=" in if, while, match, ...
self.add_target_vars(node.target, node.lineno)
self.generic_visit(node)
def visit_Assign(self, node):
for target in node.targets:
self.add_target_vars(target, node.lineno)
self.generic_visit(node)
def visit_AnnAssign(self, node):
for target in node.target:
self.add_target_vars(target, node.lineno)
self.generic_visit(node)
def visit_For(self, node):
self.add_target_vars(node.target, node.lineno)
self.generic_visit(node)
def visit_AsyncFor(self, node):
self.add_target_vars(node.target, node.lineno)
self.generic_visit(node)
def visit_FunctionDef(self, node):
# function name not considered as a variable
# self.add_target_vars(node.name, node.lineno)
for arg in node.args.args:
self.add_target_vars(arg, node.lineno)
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.add_target_vars(node.name, node.lineno)
self.generic_visit(node)
class LinearVisitor(ast.NodeVisitor):
def __init__(self):
self.jump_linenos = []
return super().__init__()
def visit_For(self, node):
self.jump_linenos.append((node.lineno, node.end_lineno))
self.generic_visit(node)
def visit_While(self, node):
self.jump_linenos.append((node.lineno, node.end_lineno))
self.generic_visit(node)
def get_jump_linenos(self):
return self.jump_linenos
def monitor_script(code, script_path):
# only to get the variables to be traced, manually set them is ok.
tree = ast.parse(code)
vars_tracker = VariableTracker()
vars_tracker.visit(tree)
lineno_of_var = vars_tracker.lineno_of_var
to_trace_vars = vars_tracker.all_vars
linear_visitor = LinearVisitor()
linear_visitor.visit(tree)
jump_linenos = linear_visitor.get_jump_linenos()
variables_logger = {}
variables_history = defaultdict(dict)
# find script module
spec = importlib.util.spec_from_file_location("monitored_module", script_path)
module = importlib.util.module_from_spec(spec)
# cache last line's info
last_line = [0]
last_filename = [script_path]
last_co_name = [None]
def trace_func(frame, event, arg):
if event == "line":
# get basic info
lineno = frame.f_lineno
filename = frame.f_code.co_filename
co_name = frame.f_code.co_name
last_code_line = linecache.getline(filename, last_line[0]).strip()
# loops are too complex, skip them
for s, e in jump_linenos:
if s <= lineno <= e:
last_line[0] = lineno
last_filename[0] = filename
last_co_name[0] = co_name
return trace_func
# only trace the script
if os.path.basename(last_filename[0]) == os.path.basename(script_path):
# only local variables
local_vars = frame.f_locals
# log and print
for var_name in to_trace_vars:
if (
var_name in local_vars
and lineno_of_var[var_name] <= last_line[0]
):
if var_name not in variables_logger:
# When a var is first defined
variables_logger[var_name] = local_vars[var_name]
variables_history[last_line[0]][var_name] = local_vars[
var_name
]
print(
f"\nAfter executing {last_filename[0].split('/')[-1]} line {last_line[0]}'s code {last_co_name[0]}: {last_code_line}, occurred this change:"
)
print(f" {var_name} = {local_vars[var_name]}")
elif variables_logger[var_name] != local_vars[var_name]:
# When a var is changed
variables_logger[var_name] = local_vars[var_name]
variables_history[last_line[0]][var_name] = local_vars[
var_name
]
print(
f"\nAfter executing {last_filename[0].split('/')[-1]} line {last_line[0]}'s code {last_co_name[0]}: {last_code_line}, occurred this change:"
)
print(f" {var_name} = {local_vars[var_name]}")
last_line[0] = lineno
last_filename[0] = filename
last_co_name[0] = co_name
return trace_func
sys.settrace(trace_func)
try:
print(f"Begin {script_path}...")
spec.loader.exec_module(module)
print(f"\n{script_path} Done")
finally:
sys.settrace(None)
if __name__ == "__main__":
script_to_monitor = "test.py"
with open(script_to_monitor, "r") as f:
code = f.read()
monitor_script(code, script_to_monitor)