Python and consumer in Kafka

Hi,
Im using Python to read a partition in Kafka, I have 3 differents partition and I need to read oll the partitions and insert the data in MYSQL table.
The first time that I try my code I read only the first partition “VFBF” and then the script blocked.
I don’t understand why…

Someone can help me?

The code is this:


from kafka import KafkaConsumer
import mysql.connector
from time import sleep
#from json import loads
import json

connection = mysql.connector.connect(host="localhost", database="test",user="root",password="XXXXXXX")
cursore = connection.cursor()
#print(connection)


consumer = KafkaConsumer('VFBF',
                        group_id='test-consumer-group',
                        bootstrap_servers=['localhost:9092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                        auto_offset_reset='earliest',
                        enable_auto_commit=False)
                        

for msg in consumer:
    #, codicecampione, codicepaziente, cognome, nome, sex, materialeid, materialedesc,repartoid,repartodesc, tipomicroorganismo
    inserisci_sql = "INSERT INTO kafka_messaggi (cf, codicecampione, codicepaziente, cognome, nome, materialeid, repartoid,repartodesc, tipomicroorganismo) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    #cf = (str(msg[6]["codice campione"]),)
    #print (cf)
    inserisci_valori = (str(msg[6]["cf"]),str(msg[6]["codice campione"]),str(msg[6]["codice paziente"]),str(msg[6]["cognome"]),str(msg[6]["nome"]),str(msg[6]["materiale"]),str(msg[6]["codice reparto"]),str(msg[6]["descrizione reparto"]),str(msg[6]["tipomicroorganismo"]),)
    print (inserisci_valori)
    cursore.execute(inserisci_sql,inserisci_valori)
    connection.commit()
    #print ("sono qua")
    #print (msg[6]["cf"])

cursore.close()

connection = mysql.connector.connect(host="localhost", database="test",user="root",password="xxxxxx")
cursore = connection.cursor()

consumer = KafkaConsumer('ZHMEU',
                        group_id='test-consumer-group',
                        bootstrap_servers=['localhost:9092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                        auto_offset_reset='earliest',
                        enable_auto_commit=False)
                        

for msg in consumer:
    #, codicecampione, codicepaziente, cognome, nome, sex, materialeid, materialedesc,repartoid,repartodesc, tipomicroorganismo
    inserisci_sql = "INSERT INTO kafka_messaggi (cf, codicecampione, codicepaziente, cognome, nome, materialeid, repartoid,repartodesc, tipomicroorganismo) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    #cf = (str(msg[6]["codice campione"]),)
    #print (cf)
    inserisci_valori = (str(msg[6]["cf"]),str(msg[6]["codice campione"]),str(msg[6]["codice paziente"]),str(msg[6]["cognome"]),str(msg[6]["nome"]),str(msg[6]["materiale"]),str(msg[6]["codice reparto"]),str(msg[6]["descrizione reparto"]),str(msg[6]["tipomicroorganismo"]),)
    print (inserisci_valori)
    cursore.execute(inserisci_sql,inserisci_valori)
    connection.commit()
    #print (msg[6]["cf"])
    
cursore.close()