Problem Summary
We run a Python worker in Kubernetes. The main process receives a tracking job from NATS and starts tracking using ProcessPoolExecutor.
Tracking receives LiveKit text stream messages from OCI Vision. Each message contains JSON with bounding boxes and base64 image/frame data.
On macOS memory looks stable. On Linux/Kubernetes, the pool_worker process memory keeps increasing until the pod hits OOM.
How Tracking Is Started
Simplified code:
# main worker process
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
_POOL = ProcessPoolExecutor(
max_workers=1,
mp_context=mp.get_context("forkserver"), # Linux path
)
future = _POOL.submit(
run_agent_in_subprocess,
"tracking",
participantJobInfo,
)
Inside the pool worker:
def run_agent_in_subprocess(agent_name, participantJobInfo):
if agent_name == "tracking":
import asyncio
from app.agents.tracking.tracking import create_tracking_task
asyncio.run(create_tracking_task(participantJobInfo))
Tracking Text Stream Handler
OCI sends tracking data through LiveKit text stream:
room.register_text_stream_handler(
"agentChat",
my_text_stream_handler(frame_buffer),
)
Handler:
def my_text_stream_handler(frame_buffer):
inflight_tasks: set[asyncio.Task] = set()
def handler(text_reader, participant_identity: str):
async def read_full_stream():
data = await text_reader.read_all()
info = getattr(text_reader, "info", None)
attrs = getattr(info, "attributes", None) if info else None
meta_json = attrs.get("frame_meta") if attrs else None
if not meta_json:
return
meta = json.loads(meta_json)
message_id = meta["message_id"]
frame_index = meta["frame_index"]
total_frames = meta["total_frames"]
buffers = message_buffers[message_id]
buffers[frame_index] = data
message_totals[message_id] = total_frames
if len(buffers) != total_frames:
return
full_payload = "".join(buffers[i] for i in range(total_frames))
message_buffers.pop(message_id, None)
message_totals.pop(message_id, None)
parsed_data = json.loads(full_payload)
if frame_buffer.full():
frame_buffer.get_nowait()
frame_buffer.task_done()
frame_buffer.put_nowait({
"parsed_data": parsed_data,
})
task = asyncio.create_task(read_full_stream())
inflight_tasks.add(task)
task.add_done_callback(inflight_tasks.discard)
return handler
Frame processor:
def frame_buffer_processor(frame_buffer, agent_identity, room_name):
async def processor():
while True:
message = await frame_buffer.get()
try:
parsed_data = message["parsed_data"]
await process_frame(parsed_data, agent_identity, room_name)
finally:
frame_buffer.task_done()
return processor()
Memory Structures
We keep these module-level dictionaries for message reassembly:
message_buffers = {}
message_totals = {}
message_started_at = {}
message_bytes = {}
We remove completed messages after reassembly, and we prune old incomplete messages.
What We See
The growing process is not the main process. It is the ProcessPoolExecutor worker process:
python3.12 -m app.main # main process, mostly stable
pool_worker # tracking runs here, RSS keeps increasing
On Linux/Kubernetes, RSS keeps increasing even though:
-
frame_buffer is bounded
-
old queue items are evicted
-
completed message_buffers entries are removed
-
async tasks are removed from inflight_tasks after completion
Question
Could this be caused by:
ProcessPoolExecutor worker reuse
Linux fork/forkserver behavior
glibc malloc arena memory retention
large base64 JSON strings
native memory from LiveKit / OCI SDK / protobuf