paho mqtt clientを使用して複数のトピックを購読しました。ブローカからメッセージを受け取ると、そのメッセージをmysqlデータベースに保存します。 DBに挿入する前にすべてのメッセージを収集したいと思います。私はしきい値に1000メッセージを設定しました。しきい値に達した場合にのみ、メッセージを一度にDBに挿入する必要があります。私はcursor.execute()の後にrow_countをチェックしています。しかし、それは1としてカウントを示しています。したがって、一括挿入は起こっていません。ここに私のサンプルコードスニペットを実行し、pymysqlモジュールでPython:MQTTブローカのメッセージを一括してmysqlデータベースに挿入
//main.py
#mysql database class
db = MySQLDBClass()
#mqtt client class where subscription,connection to broker,some callbacks
mqttclient = MyMQTTClient()
mqttclient.on_message = db.onMessage
mqttclient.loop_forever()
//MySQLDBClass.py
def __init__(self):
self.insertcounter = 0
self.insertStatement = ''
self.bulkpayload = ''
self.maxInsert = 1000
def onMessage(self, client, userdata, msg):
if msg.topic.startswith("topic1/"):
self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + "),"
elif msg.topic.startswith("topic2/"):
self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");"
elif msg.topic.startswith("topic3/")
self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");"
elif msg.topic.startswith("messages"):
self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "'," + msg.payload.decode("utf-8") + "," + datetime + ");"
else:
return # do not store in DB
self.insertcounter += 1
if (self.insertcounter > self.maxInsert):
if (self.bulkpayload != ''):
self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"
self.bulkpayload = ''
cursor.execute(self.insertStatement)
cursor.commit()
print (cursor.rowcount) #prints always count as one , expecting bulk count
self.insertcounter = 0
self.insertStatement = ''