Hi,
Im using Python to read a partition in Kafka, I have 3 differents partition and I need to read oll the partitions and insert the data in MYSQL table.
The first time that I try my code I read only the first partition “VFBF” and then the script blocked.
I don’t understand why…
Someone can help me?
The code is this:
from kafka import KafkaConsumer
import mysql.connector
from time import sleep
#from json import loads
import json
connection = mysql.connector.connect(host="localhost", database="test",user="root",password="XXXXXXX")
cursore = connection.cursor()
#print(connection)
consumer = KafkaConsumer('VFBF',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False)
for msg in consumer:
#, codicecampione, codicepaziente, cognome, nome, sex, materialeid, materialedesc,repartoid,repartodesc, tipomicroorganismo
inserisci_sql = "INSERT INTO kafka_messaggi (cf, codicecampione, codicepaziente, cognome, nome, materialeid, repartoid,repartodesc, tipomicroorganismo) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
#cf = (str(msg[6]["codice campione"]),)
#print (cf)
inserisci_valori = (str(msg[6]["cf"]),str(msg[6]["codice campione"]),str(msg[6]["codice paziente"]),str(msg[6]["cognome"]),str(msg[6]["nome"]),str(msg[6]["materiale"]),str(msg[6]["codice reparto"]),str(msg[6]["descrizione reparto"]),str(msg[6]["tipomicroorganismo"]),)
print (inserisci_valori)
cursore.execute(inserisci_sql,inserisci_valori)
connection.commit()
#print ("sono qua")
#print (msg[6]["cf"])
cursore.close()
connection = mysql.connector.connect(host="localhost", database="test",user="root",password="xxxxxx")
cursore = connection.cursor()
consumer = KafkaConsumer('ZHMEU',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False)
for msg in consumer:
#, codicecampione, codicepaziente, cognome, nome, sex, materialeid, materialedesc,repartoid,repartodesc, tipomicroorganismo
inserisci_sql = "INSERT INTO kafka_messaggi (cf, codicecampione, codicepaziente, cognome, nome, materialeid, repartoid,repartodesc, tipomicroorganismo) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
#cf = (str(msg[6]["codice campione"]),)
#print (cf)
inserisci_valori = (str(msg[6]["cf"]),str(msg[6]["codice campione"]),str(msg[6]["codice paziente"]),str(msg[6]["cognome"]),str(msg[6]["nome"]),str(msg[6]["materiale"]),str(msg[6]["codice reparto"]),str(msg[6]["descrizione reparto"]),str(msg[6]["tipomicroorganismo"]),)
print (inserisci_valori)
cursore.execute(inserisci_sql,inserisci_valori)
connection.commit()
#print (msg[6]["cf"])
cursore.close()