Difference in Measuring Latency of (torch.)Multiprocessing.Queue

I’m writing a program that requires transferring GPU tensors across processes in a pipeline manner.

I knew that using torch.Multiprocessing would automatically get the CudaIpcMemHandle for me and send it through the pipe under the hood of Multiprocessing.Queue.

I wanted to measure the latency of each item go through the Queue.

I implemented this code main_high.py :

The method is when the producer put an item to the Queue, it will also add a timestamp time.perf_counter_ns() in the tuple being sent through Queue.

import csv
import os
import time
from typing import Dict, List, Tuple

import torch
import torch.multiprocessing as mp


def producer(queue: mp.Queue, num_items: int) -> None:
    # float32 uses 4 bytes per element -> 5 MiB tensor.
    num_elements = (5 * 1024 * 1024) // 4
    tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()
    print(f"[producer] creating a new tensor per item, size={tensor_bytes} bytes")

    for item_index in range(num_items):
        tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
        # Ensure tensor materialization is complete before starting transfer timing.
        torch.cuda.synchronize()
        
        ts_start = time.perf_counter_ns()
        print(f"start time, item {item_index}: {ts_start} ns")
        queue.put(("data", item_index, tensor, ts_start))
    
        del tensor


    queue.put(("done",))

def consumer(in_queue: mp.Queue) -> None:
    item_count = 0
    producer_done = 0
    latencies_ms = {}
    while True:
        item = in_queue.get()
        ts_end = time.perf_counter_ns()
        if item[0] == "done":
            break

        _, item_index, tensor, ts_start = item
        
        latency_ms = (ts_end - ts_start) / 1e6
        latencies_ms[item_index] = latency_ms
                
        item_count += 1
        del tensor
        
    with open("latencies_high.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["item_index", "latency_ms"])
        for item_index, latency_ms in sorted(latencies_ms.items()):
            writer.writerow([item_index, latency_ms])

def main() -> None:
    mp.set_start_method("spawn", force=True)

    transfer_queue = mp.Queue()

    prod = mp.Process(
                target=producer,
                args=(transfer_queue, 50),
            )
    cons = mp.Process(target=consumer, args=(transfer_queue, ))
    prod.start()
    cons.start()

    prod.join()
    cons.join()


if __name__ == "__main__":
    main()

The problem is the measured latency of each item is very high:

item_index,latency_ms
0,130.301841
1,128.7352
2,128.617645
3,128.545633
.
.
.
48,127.844094
49,127.786926

Usually IPC with Pipes shouldn’t have taken hundreds of milliseconds. So I found another Stack Overflow Thread that also measures this component.

Here’s a reproducible snippet main_low.py

import csv
import os
import time
from typing import Dict, List, Tuple

import torch
import torch.multiprocessing as mp


def producer(queue: mp.Queue, time_end_queue: mp.Queue, num_items: int) -> None:
    # float32 uses 4 bytes per element -> 5 MiB tensor.
    num_elements = (5 * 1024 * 1024) // 4
    tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()

    latencies_ms = {}
    start_times = {}
    for item_index in range(num_items):
        tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
        # Ensure tensor materialization is complete before starting transfer timing.
        torch.cuda.synchronize()
        
        ts_start = time.perf_counter_ns()
        start_times[item_index] = ts_start
        queue.put(("data", item_index, tensor, ))
        print(f"added item {item_index} to queue, queue size: {queue.qsize()}")
        
        index, end_ts = time_end_queue.get()
        latency_ms = (end_ts - start_times[index]) / 1e6
        # print(f"[producer {producer_index}] item {item_index} latency: {latency_ms:.3f} ms")
        latencies_ms[item_index] = latency_ms
        del tensor


    queue.put(("done",))

    with open("latencies_low.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["item_index", "latency_ms"])
        for item_index, latency_ms in sorted(latencies_ms.items()):
            writer.writerow([item_index, latency_ms])

def consumer(in_queue: mp.Queue, time_end_queue: mp.Queue) -> None:
    item_count = 0
    producer_done = 0
    while True:
        item = in_queue.get()
        kind = item[0]
        if kind == "done":
            break

        _, item_index, tensor = item
        item_count += 1
        end_ts = time.perf_counter_ns()
        time_end_queue.put((item_index, end_ts))
        del tensor

def main() -> None:
    mp.set_start_method("spawn", force=True)

    transfer_queue, time_end_queue = mp.Queue(), mp.Queue()

    prod = mp.Process(
                target=producer,
                args=(transfer_queue, time_end_queue, 50),
            )
    cons = mp.Process(target=consumer, args=(transfer_queue, time_end_queue))

    prod.start()    
    cons.start()
    
    prod.join()
    cons.join()


if __name__ == "__main__":
    main()

And the results turned out to be more quite accurate:

item_index,latency_ms
0,106.522214
1,0.569158
2,0.391128
3,0.333409
.
.
.
47,0.310437
48,0.306123
49,0.307088

Here’s my problems with this difference:

  • I can’t think of any concrete reason why there should be such a huge difference in measurement.

  • In either case, there has to be a timestamp being sent through mp.Queue. So the issue shouldn’t be pickling the timestamp integer.

  • Another doubt is that the clock between processes are not synchronized. But I checked that time.perf_counter_ns() uses CLOCK_MONOTONIC. So the precision shouldn’t be this far away.