Pool executor memory not cleaning

Problem Summary
We run a Python worker in Kubernetes. The main process receives a tracking job from NATS and starts tracking using ProcessPoolExecutor.

Tracking receives LiveKit text stream messages from OCI Vision. Each message contains JSON with bounding boxes and base64 image/frame data.

On macOS memory looks stable. On Linux/Kubernetes, the pool_worker process memory keeps increasing until the pod hits OOM.

How Tracking Is Started
Simplified code:

# main worker process
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

_POOL = ProcessPoolExecutor(
max_workers=1,
mp_context=mp.get_context("forkserver"), # Linux path
)

future = _POOL.submit(
run_agent_in_subprocess,
"tracking",
participantJobInfo,
)

Inside the pool worker:

def run_agent_in_subprocess(agent_name, participantJobInfo):
if agent_name == "tracking":
import asyncio
from app.agents.tracking.tracking import create_tracking_task

asyncio.run(create_tracking_task(participantJobInfo))

Tracking Text Stream Handler
OCI sends tracking data through LiveKit text stream:

room.register_text_stream_handler(
"agentChat",
my_text_stream_handler(frame_buffer),
)

Handler:

def my_text_stream_handler(frame_buffer):
inflight_tasks: set[asyncio.Task] = set()

def handler(text_reader, participant_identity: str):
async def read_full_stream():
data = await text_reader.read_all()

info = getattr(text_reader, "info", None)
attrs = getattr(info, "attributes", None) if info else None
meta_json = attrs.get("frame_meta") if attrs else None

if not meta_json:
return

meta = json.loads(meta_json)
message_id = meta["message_id"]
frame_index = meta["frame_index"]
total_frames = meta["total_frames"]

buffers = message_buffers[message_id]
buffers[frame_index] = data
message_totals[message_id] = total_frames

if len(buffers) != total_frames:
return

full_payload = "".join(buffers[i] for i in range(total_frames))

message_buffers.pop(message_id, None)
message_totals.pop(message_id, None)

parsed_data = json.loads(full_payload)

if frame_buffer.full():
frame_buffer.get_nowait()
frame_buffer.task_done()

frame_buffer.put_nowait({
"parsed_data": parsed_data,
})

task = asyncio.create_task(read_full_stream())
inflight_tasks.add(task)
task.add_done_callback(inflight_tasks.discard)

return handler

Frame processor:

def frame_buffer_processor(frame_buffer, agent_identity, room_name):
async def processor():
while True:
message = await frame_buffer.get()

try:
parsed_data = message["parsed_data"]
await process_frame(parsed_data, agent_identity, room_name)
finally:
frame_buffer.task_done()

return processor()

Memory Structures
We keep these module-level dictionaries for message reassembly:

message_buffers = {}
message_totals = {}
message_started_at = {}
message_bytes = {}

We remove completed messages after reassembly, and we prune old incomplete messages.

What We See
The growing process is not the main process. It is the ProcessPoolExecutor worker process:

python3.12 -m app.main # main process, mostly stable
pool_worker # tracking runs here, RSS keeps increasing

On Linux/Kubernetes, RSS keeps increasing even though:

  • frame_buffer is bounded

  • old queue items are evicted

  • completed message_buffers entries are removed

  • async tasks are removed from inflight_tasks after completion

Question
Could this be caused by:

ProcessPoolExecutor worker reuse
Linux fork/forkserver behavior
glibc malloc arena memory retention
large base64 JSON strings
native memory from LiveKit / OCI SDK / protobuf