A new version.
Click on “Open PDF”, select the PDF, click OK. It shows only one page at a time, which forces you to review the rows.
Click “Next page” or press Shift+PageDown for the next page.
Click “Extract page” or press F12 to extract the rows from the page.
Click “Extract to this page” or press Shift+F12 to extract the rows from the pages up to and including the current page.
As I said before, it’s quicker to step through multiple pages, fixing any errors you find and/or modifying the pattern, and then click “Extract to this page”.
The rows are written to a CSV file.
Once I got into my stride, I managed to extract the rows of the 200 pages (>14_000 rows) in a little over 10 minutes.
#!python3
# -*- encoding: utf-8 -*-
# M R A Barnett
# December 2023
from contextlib import suppress
from csv import DictWriter
from os import mkdir, scandir
from os.path import dirname, getsize, isdir, isfile, join, normpath, splitext
from PyPDF2 import PdfReader
from queue import Empty, Queue
from threading import Thread
from time import sleep
from tkinter.filedialog import askopenfilename
import json
import re
import tkinter as tk
import tkinter.font as tkfont
import tkinter.ttk as ttk
INITIAL_PATTERN = r"""(?P<marca>[A-Z]+(?:\+\S+)?) (?P<modelo>.*) (?P<periodo>\d{4}-(?:\d{4})?)? (?P<cc>\d*) (?P<cilind>\d{0,2}) (?P<gd>\S*) (?P<pkw>\S*) (?P<cvf>\S*) (?P<co2>\S*) (?P<cv>\d+(?:,\d)?) (?P<valor>\d+)"""
# Tooltip class from https://pythonexamples.org/python-tkinter-label-tooltip/
# Modified to keep the tooltop fully on-screen.
class Tooltip:
def __init__(self, widget, text):
self.widget = widget
self.text = text
self.tooltip = None
self.widget.bind("<Enter>", self.show)
self.widget.bind("<Leave>", self.hide)
def show(self, event=None):
x, y, _, _ = self.widget.bbox("insert")
x += self.widget.winfo_rootx() + 25
y += self.widget.winfo_rooty() + 25
# Keep the tooltip fully on-screen.
font = tkfont.nametofont(self.widget["font"])
text_width = font.measure(self.text)
screen_width = self.widget.winfo_screenwidth()
x = min(x, screen_width - text_width - 8)
self.tooltip = tk.Toplevel(self.widget)
self.tooltip.wm_overrideredirect(True)
self.tooltip.wm_geometry(f"+{x}+{y}")
label = ttk.Label(self.tooltip, text=self.text, background="#ffffe0", relief="solid", borderwidth=1)
label.pack()
def hide(self, event=None):
if self.tooltip:
self.tooltip.destroy()
self.tooltip = None
class App(tk.Tk):
def __init__(self):
tk.Tk.__init__(self)
self.title("Extract data")
self.state("zoomed")
self.grid_columnconfigure(0, weight=1)
self.grid_columnconfigure(1, weight=0)
self.grid_rowconfigure(0, weight=0)
self.grid_rowconfigure(1, weight=0)
self.grid_rowconfigure(2, weight=0)
self.grid_rowconfigure(3, weight=1)
self.grid_rowconfigure(4, weight=0)
# Page number and buttons.
frame = tk.Frame(self)
frame.grid(row=0, column=0, columnspan=2, sticky="we")
# Displays the page number.
self.location = tk.Label(frame)
self.location.pack(side="left")
# The buttons.
self.extract_page_button = tk.Button(frame, text="Extract page", command=self.on_extract_page)
self.extract_page_button.pack(side="right")
Tooltip(self.extract_page_button, "Extract rows from this page (F12)")
self.extract_upto_page_button = tk.Button(frame, text="Extract to this page", command=self.on_extract_upto_page)
self.extract_upto_page_button.pack(side="right")
Tooltip(self.extract_upto_page_button, "Extract rows from pages up to and including this page (Shift+F12)")
next_page_button = tk.Button(frame, text="Next page")
next_page_button.pack(side="right")
Tooltip(next_page_button, "Next page (Shift+PageDown)")
prev_page_button = tk.Button(frame, text="Previous page")
prev_page_button.pack(side="right")
Tooltip(prev_page_button, "Previous page (Shift+PageUp)")
next_match_button = tk.Button(frame, text="Next match", command=self.on_next_match)
next_match_button.pack(side="right")
Tooltip(next_match_button, "Find page with next match (F3)")
open_button = tk.Button(frame, text="Open PDF...", command=self.on_open_pdf)
open_button.pack(side="right")
Tooltip(open_button, "Open PDF")
# The pattern.
frame = tk.Frame(self)
frame.grid(row=1, column=0, columnspan=2, sticky="we")
tk.Label(frame, text="Pattern:").pack(side="left")
self.pattern_var = tk.StringVar()
self.pattern_var.trace_add("write", self.on_pattern_change)
tk.Entry(frame, textvariable=self.pattern_var).pack(side="left", fill="x", expand=True)
# The match, if any.
frame = tk.Frame(self)
frame.grid(row=2, column=0, columnspan=2, sticky="we")
tk.Label(frame, text="Match:").pack(side="left")
self.match_var = tk.StringVar()
self.match_entry = tk.Entry(frame, state="readonly", textvariable=self.match_var)
self.match_entry.pack(side="left", fill="x", expand=True)
# The text box that contains the page.
yscrollbar = tk.Scrollbar(self, orient="vertical")
yscrollbar.grid(row=3, column=1, sticky="ns")
xscrollbar = tk.Scrollbar(self, orient="horizontal")
xscrollbar.grid(row=4, column=0, sticky="we")
self.textbox = tk.Text(self, undo=True, maxundo=-1, yscrollcommand=yscrollbar.set, xscrollcommand=xscrollbar.set)
self.textbox.grid(row=3, column=0, sticky="nswe")
self.textbox.tag_configure("highlight", background="yellow")
self.textbox.bind("<<Modified>>", self.text_changed)
self.textbox.focus()
yscrollbar.config(command=self.textbox.yview)
xscrollbar.config(command=self.textbox.xview)
self.bind("<F3>", self.on_next_match)
self.bind("<Shift-Prior>", self.on_shift_page_up)
self.bind("<Shift-Next>", self.on_shift_page_down)
self.bind("<F12>", self.on_extract_page)
self.bind("<Shift-F12>", self.on_extract_upto_page)
self.config_path = splitext(__file__)[0] + ".json"
self.load_config()
self.pdf_path = None
self.csv_path = None
# Initialise the pattern.
self.pattern_var.set(INITIAL_PATTERN)
self.protocol("WM_DELETE_WINDOW", self.on_quit)
self.pages = []
self.pages_queue = Queue()
self.cur_page = 0
self.cursor_pos = (0, 0)
self.matches = []
self.on_tick()
def on_open_pdf(self):
path = askopenfilename(initialdir=dirname(__file__), filetypes=[("PDF file", "*.pdf")])
if not path:
return
self.pdf_path = normpath(path)
self.csv_path = splitext(path)[0] + ".csv"
page_folder = splitext(self.pdf_path)[0]
with suppress(FileExistsError):
mkdir(page_folder)
self.pages = []
self.load_pages(page_folder)
with open(self.pdf_path, "rb") as pdf_read:
pdf = PdfReader(pdf_read)
num_pages = len(pdf.pages)
if len(self.pages) < num_pages:
self.thread = Thread(target=self.reader_func, args=(len(self.pages), num_pages), daemon=True)
self.thread.start()
else:
self.textbox.delete("1.0", "end")
self.textbox.insert("1.0", self.pages[self.cur_page])
self.textbox.mark_set("insert", "1.0")
self.cur_page = 0
self.location["text"] = f"Page {self.cur_page + 1} of {len(self.pages)}"
def load_pages(self, page_folder):
self.pages = []
pages = []
for entry in scandir(page_folder):
if not entry.name.startswith("Page "):
continue
page_number = int(splitext(entry.name)[0].partition(" ")[2])
with open(entry.path, encoding="utf-8") as file:
page = file.read()
pages.append((page_number, page))
self.pages = [page for page_number, page in sorted(pages)]
def save_pages(self):
page_folder = splitext(self.pdf_path)[0]
for page_number, page in enumerate(self.pages, start=1):
page_path = join(page_folder, f"Page {page_number}.txt")
with open(page_path, "w", encoding="utf-8") as file:
file.write(page)
def on_shift_page_up(self, event=None):
self.go_to_page(self.cur_page - 1)
return "break"
def on_shift_page_down(self, event=None):
self.go_to_page(self.cur_page + 1)
return "break"
def go_to_page(self, new_page):
if not 0 <= new_page < len(self.pages):
return
self.pages[self.cur_page] = self.textbox.get("1.0", "end-1c")
self.cur_page = new_page
self.location["text"] = f"Page {self.cur_page + 1} of {len(self.pages)}"
self.textbox.delete("1.0", "end")
self.textbox.insert("1.0", self.pages[self.cur_page])
self.textbox.mark_set("insert", "1.0")
def text_changed(self, event=None):
self.textbox.edit_modified()
self.highlight_text()
self.textbox.edit_modified(False)
def highlight_text(self):
can_extract = False
cur_pos = 0
line_start = 0
cur_line = 1
self.matches = []
if not self.pattern_var.get().strip():
return
self.textbox.tag_remove("highlight", "1.0", "end")
page = self.textbox.get("1.0", "end-1c")
try:
pattern = re.compile(self.pattern_var.get())
except re.error:
self.show_match()
return
for m in pattern.finditer(page):
line_start = max(page.rfind("\n", cur_pos, m.start()) + 1, line_start)
extra_newlines = page.count("\n", cur_pos, line_start)
cur_line += page.count("\n", cur_pos, line_start)
from_line_col = "%d.%d" % (cur_line, m.start() - line_start)
cur_pos = line_start
line_start = max(page.rfind("\n", cur_pos, m.end()) + 1, line_start)
extra_newlines = page.count("\n", cur_pos, line_start)
cur_line += page.count("\n", cur_pos, line_start)
to_line_col = "%d.%d" % (cur_line, m.end() - line_start)
cur_pos = line_start
self.matches.append((tuple(map(int, from_line_col.split("."))), tuple(map(int, to_line_col.split("."))), m))
self.textbox.tag_add("highlight", from_line_col, to_line_col)
if m.groupdict():
can_extract = True
self.show_match()
def on_extract_page(self, event=None):
page = self.textbox.get("1.0", "end-1c")
page = self.extract_from_page(page)
if page is None:
return "break"
self.textbox.delete("1.0", "end")
self.textbox.insert("1.0", page)
self.textbox.mark_set("insert", "1.0")
self.cursor_pos = (0, 0)
self.show_match()
return "break"
def on_extract_upto_page(self, event=None):
self.pages[self.cur_page] = self.textbox.get("1.0", "end-1c")
for page_number in range(self.cur_page + 1):
self.pages[page_number] = self.extract_from_page(self.pages[page_number])
self.textbox.delete("1.0", "end")
self.textbox.insert("1.0", self.pages[self.cur_page])
self.textbox.mark_set("insert", "1.0")
self.cursor_pos = (0, 0)
self.show_match()
return "break"
def extract_from_page(self, page):
try:
pattern = re.compile(self.pattern_var.get())
except re.error:
return
extracted = []
remainder = []
cur_pos = 0
fieldnames = None
for m in pattern.finditer(page):
if fieldnames is None:
fieldnames = tuple(m.groupdict().keys())
remainder.append(page[cur_pos : m.start()])
extracted.append(m.groupdict(default=""))
cur_pos = m.end()
remainder.append(page[cur_pos : ])
if not extracted:
return page
csv_mode = "a" if isfile(self.csv_path) and getsize(self.csv_path) > 0 else "w"
with open(self.csv_path, csv_mode, newline="", encoding="UTF-8") as csv_file:
writer = DictWriter(csv_file, fieldnames=fieldnames)
if csv_mode == "w":
writer.writeheader()
for row in extracted:
writer.writerow(row)
page = "\n".join(remainder)
page = re.sub(r"(?m)^ +$", "", page)
page = re.sub(r"\n{2,}", r"\n\n", page)
return page
def show_match(self):
first = 0
last = len(self.matches)
display = ""
while first < last:
mid = (first + last) // 2
match = self.matches[mid]
if match[0] <= self.cursor_pos <= match[1]:
display = str(match[2].groupdict())
break
if self.cursor_pos < match[0]:
last = mid
else:
first = mid + 1
self.match_var.set(display)
def on_pattern_change(self, *args):
self.highlight_text()
def on_tick(self):
cur_pos = tuple(map(int, self.textbox.index("insert").split(".")))
if cur_pos != self.cursor_pos:
self.cursor_pos = cur_pos
self.show_match()
added_page = False
old_num_pages = len(self.pages)
try:
while True:
page = self.pages_queue.get_nowait()
self.pages.append(page)
added_page = True
except Empty:
pass
if added_page:
self.location["text"] = f"Page {self.cur_page + 1} of {len(self.pages)}"
if old_num_pages == 0 and self.pages:
self.textbox.delete("1.0", "end")
self.textbox.insert("1.0", self.pages[self.cur_page])
self.textbox.mark_set("insert", "1.0")
self.after(250, self.on_tick)
def on_quit(self):
if self.pdf_path:
self.pages[self.cur_page] = self.textbox.get("1.0", "end-1c")
self.save_pages()
if self.csv_path and isfile(self.csv_path):
self.tidy_rows(self.csv_path)
try:
self.save_config()
finally:
self.destroy()
def load_config(self):
try:
with open(self.config_path, encoding="utf-8") as file:
config = json.load(file)
except FileNotFoundError:
config = {}
self.pattern_var.set(config.get("pattern", ""))
def save_config(self):
config = {"pattern": self.pattern_var.get()}
with open(self.config_path, "w", encoding="utf-8") as file:
json.dump(config, file)
def tidy_rows(self, csv_path):
with open(csv_path, encoding="utf-8") as file:
rows = file.readlines()
rows = sorted(set(rows), key=str.casefold)
with open(csv_path, "w", encoding="utf-8") as file:
file.writelines(rows)
def reader_func(self, from_page, to_page):
page_folder = splitext(self.pdf_path)[0]
with open(self.pdf_path, "rb") as pdf_read:
pdf = PdfReader(pdf_read)
for page_number in range(from_page, to_page):
text_path = join(page_folder, f"Page {page_number + 1}.txt")
with open(text_path, mode="w", encoding="UTF-8") as text_file:
page = pdf.pages[page_number].extract_text()
text_file.write(page)
self.pages_queue.put(page)
def on_next_match(self, event=None):
try:
pattern = re.compile(self.pattern_var.get())
except re.error:
return "break"
cur_page = self.cur_page + 1
while cur_page < len(self.pages) and not pattern.search(self.pages[cur_page]):
cur_page += 1
self.go_to_page(min(cur_page, len(self.pages)))
return "break"
App().mainloop()