2017-11-22 8 views
0

paho mqtt clientを使用して複数のトピックを購読しました。ブローカからメッセージを受け取ると、そのメッセージをmysqlデータベースに保存します。 DBに挿入する前にすべてのメッセージを収集したいと思います。私はしきい値に1000メッセージを設定しました。しきい値に達した場合にのみ、メッセージを一度にDBに挿入する必要があります。私はcursor.execute()の後にrow_countをチェックしています。しかし、それは1としてカウントを示しています。したがって、一括挿入は起こっていません。ここに私のサンプルコードスニペットを実行し、pymysqlモジュールでPython:MQTTブローカのメッセージを一括してmysqlデータベースに挿入

//main.py 

#mysql database class 
db = MySQLDBClass() 

#mqtt client class where subscription,connection to broker,some callbacks 
mqttclient = MyMQTTClient() 
mqttclient.on_message = db.onMessage 
mqttclient.loop_forever() 

//MySQLDBClass.py 

def __init__(self): 

     self.insertcounter = 0 
     self.insertStatement = '' 
     self.bulkpayload = '' 
     self.maxInsert = 1000 

    def onMessage(self, client, userdata, msg): 

     if msg.topic.startswith("topic1/"): 
      self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + ")," 
     elif msg.topic.startswith("topic2/"): 
      self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("topic3/") 
      self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("messages"): 
      self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "'," + msg.payload.decode("utf-8") + "," + datetime + ");" 
     else: 
      return # do not store in DB 

     self.insertcounter += 1 

     if (self.insertcounter > self.maxInsert): 
      if (self.bulkpayload != ''): 
       self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"  
       self.bulkpayload = '' 

      cursor.execute(self.insertStatement) 
      cursor.commit() 
      print (cursor.rowcount) #prints always count as one , expecting bulk count 
      self.insertcounter = 0 
      self.insertStatement = '' 

答えて