When i run the below code, the memory_usage
-
In Ubuntu, it keeps on increasing. Garbage collection is Not happening as expected
-
In MAC OS, it is linear with kafka-consumption rate… Garbage collection is happening as expected.
Any idea why we are seeing this behaviour in Ubuntu systems.
import json
import orjson
import clickhouse_connect
import pandas as pd
import time
import multiprocessing as mp
import logging
from polars_processing import packet_preprocessing
from multiprocessing import Process, Queue
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from multiprocessing import Pool
from pprint import pprint
import functools
import psutil
import gc
# from confluent_kafka import Consumer, KafkaException
def read_messages():
consumer_logger = logging.getLogger("read_logger")
consumer_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
consumer_logger.addHandler(handler)
consumer_logger.info("Kafka Reading Process Started")
def post_commit_callback(offsets, err=None):
if err:
# pprint(before_parition_offset)
consumer_logger.info(f"Commit error: {err}")
else:
# pprint(before_parition_offset)
consumer_logger.info("Offsets committed successfully!")
current_memory = psutil.Process().memory_info().rss
consumer_logger.info(f" Memory Usage: {current_memory / (1024 ** 2):.2f} MB")
consumer_config = {
'bootstrap_servers': 'localhost:9093',
'group_id': 'test',
'auto_offset_reset': 'earliest', # Start consuming from the beginning of the topic
'enable_auto_commit': False , # Disable automatic committing of offsets
'max_poll_records':50000,
'fetch_max_bytes':52428800,
'max_partition_fetch_bytes':52428800,
'fetch_max_wait_ms':500,
}
consumer = KafkaConsumer('Track', **consumer_config)
prev_time= time.perf_counter()
message_counter=0
batch_lis=[]
while True:
records = consumer.poll(100,
# max_records=5
)
# if records:
for tp, messages in records.items():
for message in messages:
if message is not None:
packet = orjson.loads(message.value.decode('utf-8'))
if packet['imeiNo']=='999999999999999':
# continue
# print(tp.partition)
# print(message.offset)
# consumer_logger.info(f" Message counter: {message_counter}")
batch_lis.append(packet)
cur_time=time.perf_counter()
if cur_time-prev_time>=5 or len(batch_lis)>=25000:
# consumer_logger.info(f"{(cur_time-prev_time, len(batch_lis)"})
if len(batch_lis)!=0:
# print("batch_counter",len(batch_lis))
# print("message_counter",message_counter)
df = packet_preprocessing(batch_lis)
batch_lis=[]
insert_start_time=time.perf_counter()
if df.shape[0]>0:
insert_start_time=time.perf_counter()
insertion(df,consumer_logger)
consumer_logger.info(f" Per batch insertion time: {time.perf_counter()-insert_start_time}")
consumer_logger.info(f" Per batch messages: {df.shape}")
del df
prev_time=cur_time
consumer.commit_async(callback=post_commit_callback)
gc.collect()
def insertion(df,consumer_logger):
client = clickhouse_connect.get_client(
host='10.44.66.186',
port=8123,
username='xyz', password='xyx',database='xyx')
settings = {
'async_insert': 1,
'wait_for_async_insert': 1,
'async_insert_threads':1,
'async_insert_busy_timeout_ms': 5000,
'async_insert_max_data_size': 10000000000,
}
try:
startTime=time.perf_counter()
client.insert_df('u388_data_python',
df,
# column_names=table_columns,
settings=settings)
client.close()
except Exception as err:
consumer_logger.info("ERROR message")
consumer_logger.info(err.__class__)
consumer_logger.info(err)
if __name__ == '__main__':
read_messages()```
in Mac Memory usage:

In Ubuntu Memory usage: Its just staying there at ~500mb
