I’m writing a program that requires transferring GPU tensors across processes in a pipeline manner.
I knew that using torch.Multiprocessing would automatically get the CudaIpcMemHandle for me and send it through the pipe under the hood of Multiprocessing.Queue.
I wanted to measure the latency of each item go through the Queue.
I implemented this code main_high.py :
The method is when the producer put an item to the Queue, it will also add a timestamp time.perf_counter_ns() in the tuple being sent through Queue.
import csv
import os
import time
from typing import Dict, List, Tuple
import torch
import torch.multiprocessing as mp
def producer(queue: mp.Queue, num_items: int) -> None:
# float32 uses 4 bytes per element -> 5 MiB tensor.
num_elements = (5 * 1024 * 1024) // 4
tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()
print(f"[producer] creating a new tensor per item, size={tensor_bytes} bytes")
for item_index in range(num_items):
tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
# Ensure tensor materialization is complete before starting transfer timing.
torch.cuda.synchronize()
ts_start = time.perf_counter_ns()
print(f"start time, item {item_index}: {ts_start} ns")
queue.put(("data", item_index, tensor, ts_start))
del tensor
queue.put(("done",))
def consumer(in_queue: mp.Queue) -> None:
item_count = 0
producer_done = 0
latencies_ms = {}
while True:
item = in_queue.get()
ts_end = time.perf_counter_ns()
if item[0] == "done":
break
_, item_index, tensor, ts_start = item
latency_ms = (ts_end - ts_start) / 1e6
latencies_ms[item_index] = latency_ms
item_count += 1
del tensor
with open("latencies_high.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["item_index", "latency_ms"])
for item_index, latency_ms in sorted(latencies_ms.items()):
writer.writerow([item_index, latency_ms])
def main() -> None:
mp.set_start_method("spawn", force=True)
transfer_queue = mp.Queue()
prod = mp.Process(
target=producer,
args=(transfer_queue, 50),
)
cons = mp.Process(target=consumer, args=(transfer_queue, ))
prod.start()
cons.start()
prod.join()
cons.join()
if __name__ == "__main__":
main()
The problem is the measured latency of each item is very high:
item_index,latency_ms
0,130.301841
1,128.7352
2,128.617645
3,128.545633
.
.
.
48,127.844094
49,127.786926
Usually IPC with Pipes shouldn’t have taken hundreds of milliseconds. So I found another Stack Overflow Thread that also measures this component.
Here’s a reproducible snippet main_low.py
import csv
import os
import time
from typing import Dict, List, Tuple
import torch
import torch.multiprocessing as mp
def producer(queue: mp.Queue, time_end_queue: mp.Queue, num_items: int) -> None:
# float32 uses 4 bytes per element -> 5 MiB tensor.
num_elements = (5 * 1024 * 1024) // 4
tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()
latencies_ms = {}
start_times = {}
for item_index in range(num_items):
tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
# Ensure tensor materialization is complete before starting transfer timing.
torch.cuda.synchronize()
ts_start = time.perf_counter_ns()
start_times[item_index] = ts_start
queue.put(("data", item_index, tensor, ))
print(f"added item {item_index} to queue, queue size: {queue.qsize()}")
index, end_ts = time_end_queue.get()
latency_ms = (end_ts - start_times[index]) / 1e6
# print(f"[producer {producer_index}] item {item_index} latency: {latency_ms:.3f} ms")
latencies_ms[item_index] = latency_ms
del tensor
queue.put(("done",))
with open("latencies_low.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["item_index", "latency_ms"])
for item_index, latency_ms in sorted(latencies_ms.items()):
writer.writerow([item_index, latency_ms])
def consumer(in_queue: mp.Queue, time_end_queue: mp.Queue) -> None:
item_count = 0
producer_done = 0
while True:
item = in_queue.get()
kind = item[0]
if kind == "done":
break
_, item_index, tensor = item
item_count += 1
end_ts = time.perf_counter_ns()
time_end_queue.put((item_index, end_ts))
del tensor
def main() -> None:
mp.set_start_method("spawn", force=True)
transfer_queue, time_end_queue = mp.Queue(), mp.Queue()
prod = mp.Process(
target=producer,
args=(transfer_queue, time_end_queue, 50),
)
cons = mp.Process(target=consumer, args=(transfer_queue, time_end_queue))
prod.start()
cons.start()
prod.join()
cons.join()
if __name__ == "__main__":
main()
And the results turned out to be more quite accurate:
item_index,latency_ms
0,106.522214
1,0.569158
2,0.391128
3,0.333409
.
.
.
47,0.310437
48,0.306123
49,0.307088
Here’s my problems with this difference:
-
I can’t think of any concrete reason why there should be such a huge difference in measurement.
-
In either case, there has to be a timestamp being sent through mp.Queue. So the issue shouldn’t be pickling the timestamp integer.
-
Another doubt is that the clock between processes are not synchronized. But I checked that time.perf_counter_ns() uses CLOCK_MONOTONIC. So the precision shouldn’t be this far away.