2017-07-20 24 views
-1

mqttクライアントが購読しているトピックからすべてのメッセージを受信しようとしていますが、メッセージを送信するたびに最初のメッセージのみを受信します問題は、クライアントがqos 2で10個のメッセージを処理する必要がありますが、代わりに最初のメッセージのみを処理するということです。メッセージは、数ミリ秒の時間間隔で同時に送信されます。私は常にメッセージを送信しているわけではありません。毎分10のメッセージを送信します。両方のクライアントは永続的です。メッセージが送信されるたびに、ペイロードが印刷されるため、メッセージがパブリッシャーから離れることは確実です。受信したメッセージがデータベースに保存され、重複しないようにするためにqos 2を使用しています。私が使用するブローカーはactivemqです。ですから、なぜこれが起こっているのでしょう?Mqttクライアントは同時に複数のメッセージを処理できません

from sqlalchemy.ext.automap import automap_base 
from sqlalchemy.orm import Session 
from sqlalchemy import create_engine 
from sqlalchemy import update 
from sqlalchemy.ext.automap import generate_relationship 
import sqlalchemy 
import paho.mqtt.client as mqtt 
import time 
#Function that define what to do on client conenction 
def on_connect(client, userdata, rc): 
    #Subscribe to all specified topics 
    mqttc.subscribe(topic='/+/mysignals/sensors/+/') 
def on_message(client,userdata,message): 
    #Get the mysignals member id from the topic 
    topic_split = message.topic.split('/') 
    member_id = topic_split[1] 
    session = Session(engine) 
    sensor_id = topic_split[4] 
    patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first() 
    if message.payload == None: 
     payload = 0 
    else: 
     payload = message.payload 
    if patient: 
     current_time = time.time() 
     if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55: 
      pending[patient.id]['record'].__dict__[sensor_id] = payload 
      print time.time() 
     else: 
      pending.pop(patient.id,None) 
      patientdata = PatientData() 
      patientdata.__dict__[sensor_id] = payload 
      print patientdata.__dict__[sensor_id] 
      print payload 
      print patientdata.temp 
      patient.patientdata_collection.append(patientdata) 
      session.add(patientdata) 
      print time.time() 
      pending.update({patient.id:{ 
            'time_created':time.time(), 
            'record':patientdata, 
            }}) 
     session.flush() 
     session.commit() 
     print('Wrote to database.') 

pending = {} 
Base = automap_base() 
engine = create_engine('mysql+mysqlconnector://user:[email protected]/db') 
# reflect the tables 
Base.prepare(engine, reflect=True) 
Patient = Base.classes.patient 
PatientData = Base.classes.patientdata 
session = Session(engine) 
#Create a mqtt client object 
mqttc = mqtt.Client(client_id='database_logger',clean_session=False) 
#Set mqtt client callbacks 
mqttc.on_connect = on_connect 
mqttc.on_message = on_message 
#Set mqtt broker username and password 
mqttc.username_pw_set('blah','blahblah') 
#Connect to the mqtt broker with the specified hostname/ip adress 
mqttc.connect('127.0.0.1') 
mqttc.loop_forever() 

出力リレー:

98 
98 
None 
1500576377.3 
Wrote to database. 
1500576377.43 
Wrote to database. 

出力は次のようになります。

98 
98 
None 
1500576377.3 
Wrote to database. 
25.4 
25.4 
25.4 
1500576377.43 
Wrote to database. 
+0

コードを表示するために質問を編集 – hardillb

+0

投稿を更新し、自分のコードを追加しました。 –

+0

最初に、あなたの話題は '/'で始まって終わるべきではなく、次にコードからどのような出力を得ますか? – hardillb

答えて

-1

それは最終的にはMQTTクライアントの問題ではありませんでした。 コードが間違っていて、2番目のメッセージがデータベースに書き込まれていませんでした。外

session = Session(engine) 

:また、行を削除

setattr(pending[patient.id]['record'],sensor_id,payload) 

:この1と

pending[patient.id]['record'].__dict__[sensor_id] = payload 

:それは仕事を得るためには

は、私は次の行を交換しなければなりませんでしたon_message関数の説明です。セッションにトランザクションがデータベースで実行されるたびにきれいにするために

session.commit() 

:行の下

session.expunge_all() 

は、私はまた、行を追加しました。

関連する問題