Memory Leaking in Ubuntu not in Mac

When i run the below code, the memory_usage

  • In Ubuntu, it keeps on increasing. Garbage collection is Not happening as expected

  • In MAC OS, it is linear with kafka-consumption rate… Garbage collection is happening as expected.

Any idea why we are seeing this behaviour in Ubuntu systems.

import json
import orjson
import clickhouse_connect
import pandas as pd
import time
import multiprocessing as mp
import logging
from polars_processing import packet_preprocessing
from multiprocessing import Process, Queue
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from multiprocessing import Pool
from pprint import pprint
import functools
import psutil
import gc
# from confluent_kafka import Consumer, KafkaException




def read_messages():
    consumer_logger = logging.getLogger("read_logger")
    consumer_logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    consumer_logger.addHandler(handler)
    consumer_logger.info("Kafka Reading Process Started")

    def post_commit_callback(offsets, err=None):
        if err:
            # pprint(before_parition_offset)
            consumer_logger.info(f"Commit error: {err}")
        else:
            # pprint(before_parition_offset)
            consumer_logger.info("Offsets committed successfully!")
        current_memory = psutil.Process().memory_info().rss
        consumer_logger.info(f" Memory Usage: {current_memory / (1024 ** 2):.2f} MB")

    consumer_config = {
            'bootstrap_servers': 'localhost:9093',
            'group_id': 'test',
            'auto_offset_reset': 'earliest',  # Start consuming from the beginning of the topic
            'enable_auto_commit': False , # Disable automatic committing of offsets
            'max_poll_records':50000,
            'fetch_max_bytes':52428800,
            'max_partition_fetch_bytes':52428800,
            'fetch_max_wait_ms':500,
        }

    consumer = KafkaConsumer('Track', **consumer_config)
    prev_time= time.perf_counter()


    message_counter=0
    batch_lis=[]
    while True: 
        records = consumer.poll(100, 
                                    # max_records=5
                                )
        
        # if records:
        for tp, messages in records.items():
                for message in messages:
                    if message is not None:
                        packet = orjson.loads(message.value.decode('utf-8'))
                        if packet['imeiNo']=='999999999999999':
                            # continue
                            # print(tp.partition)
                            # print(message.offset)
                            # consumer_logger.info(f" Message counter: {message_counter}")
                            batch_lis.append(packet)
        cur_time=time.perf_counter()                 
        if cur_time-prev_time>=5 or len(batch_lis)>=25000:
                # consumer_logger.info(f"{(cur_time-prev_time, len(batch_lis)"})        
                if len(batch_lis)!=0:
                    # print("batch_counter",len(batch_lis))
                    # print("message_counter",message_counter)
                    df = packet_preprocessing(batch_lis)
                    batch_lis=[]
                    insert_start_time=time.perf_counter()
                    if df.shape[0]>0:
                        insert_start_time=time.perf_counter()
                        insertion(df,consumer_logger)
                        consumer_logger.info(f" Per batch insertion time: {time.perf_counter()-insert_start_time}")
                        consumer_logger.info(f" Per batch messages: {df.shape}")
                    del df
                prev_time=cur_time
                
                consumer.commit_async(callback=post_commit_callback)
        gc.collect()


def insertion(df,consumer_logger):
    client = clickhouse_connect.get_client(
                        host='10.44.66.186',
                        port=8123,
                        username='xyz', password='xyx',database='xyx')
    settings = {
            'async_insert': 1,
            'wait_for_async_insert': 1,
            'async_insert_threads':1,
            'async_insert_busy_timeout_ms': 5000,
            'async_insert_max_data_size': 10000000000,
        }

    try:
        startTime=time.perf_counter()
        client.insert_df('u388_data_python',
                        df,
                        # column_names=table_columns,
                        settings=settings)
        client.close()

    except Exception as err:
        consumer_logger.info("ERROR  message")
        consumer_logger.info(err.__class__)
        consumer_logger.info(err)       


if __name__ == '__main__':
    read_messages()```


in Mac Memory usage:
![Screenshot 2024-01-23 at 12.21.37 PM|690x355](upload://86GQWSEzyAXY8TBFkLZt4hrXDNG.png)


In Ubuntu Memory usage: Its just staying there at ~500mb
![Screenshot 2024-01-23 at 12.23.08 PM|690x405](upload://bspCEMGXlhZ29BoGqP9niMFWxPg.jpeg)

When memory use increases over time it is becuase your app is keeping objects in use usually. It is very rarely becuase of the garbage collector.

Do you see the memory increase for a few hours of running then stablise or increase all the time?

In MAC, garbage collection is happening
In ubuntu memory is not freeing itself and giving to os. Garbage collection is not happening

Over time memory usage is not at all going down in Ubuntu.

I suspect what you are seeing is not a memory leak.

gc does not free memory. It deletes python objects.
Depending on cpython implemenation details, C library details and OS details that may or may not lead to pages of memory being released.
But that is not necessarily a memory leak as the memory will be reused for future objects.

How exactly are you measuring process memory usage on both OS?

1 Like

I agree that this is unlikely to be an actual memory leak. Please read:

(Stack Overflow has many similar Q&As to the second one, but nothing emerges as clearly authoritative, IMO.)

(Edit: Actually maybe this is more like it:)

1 Like

Thanks @kknechtel , @barry-scott , it is helpful… i now see the objects being deleted but the python process is not releasing the memory back to OS. I understand this is the Cpython default behaviour. Is there a way to force python to release memory back to OS when no longer in use for some amount in time, without using child processes ?

Have you proved that you are not preventing python objects from being deleted?
If not then nothing you do to tune memory will help in the end.

You can try changing the configuration so glibc’s malloc and see if that helps.
See man mallopt for what can be tuned and the env vars that control the options.

No, because it isn’t Python refusing to release it; it’s the OS refusing to claim it. As far as the OS is concerned, Python might just ask for it again in the next microsecond, so it has no reason to try to clean up unless another process is clamoring for memory that isn’t more obviously freely available.