2016-07-15 4 views
0

Azureイベントハブを設定しました.PythonスクリプトからAMQPメッセージをJSON形式で送信しています。これらのメッセージをストリームアナリティクスを使用してPower BIにストリーミングしようとしています。 メッセージから非常に単純なデバイスのアクティビティとのIoTデバイスストリームアナリティクス、イベントハブ経由でPythonからJSON deserialising

ザPythonのスニペットはノーバックイベント・ハブからデータを読み取るためにMSのチュートリアル例C#メッセージリーダーを使用している

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf8') 
message.body = msg 
messenger.put(message) 
messenger.send() 

あります問題は、出力は次のとおりです。

Message received. Partition: '2', Data: '??{"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"}' 

しかし、私は、イベント・ハブからのストリーム解析の入力をテストしようとすると、私はエラーを取得

診断:入力イベントをJsonとしてデシリアライズできませんでした。いくつかの理由が考えられます:1)不正なイベント2)不正なシリアル化形式で入力された入力ソース

不正なイベントの意味がわかりません - 私はStream AnalyticsがAMQP経由でイベントハブに送信されたデータに対応できると思っています。

BOMシンボルが問題を引き起こさない限り、C#アプリケーションで受け取ったJSONに何か問題はありませんか?

これは私の最初の試みであり、誰も正しい方向に向けることができたら、本当に感謝しています。

乾杯 ロブ

答えて

2

これは、クライアントAPIの非互換性によって引き起こされます。 Pythonは、Protonを使用して、AMQP Valueメッセージの本文にJSON文字列を送信します。本文は、AMQP文字列(AMQP型の文字列のバイト+ utf8でエンコードされたバイト)でエンコードされます。 Stream AnalyticsはAMQPメッセージをEventDataとして公開するService Bus .Net SDKを使用し、その本体は常にバイト配列です。 AMQP値メッセージの場合、AMQPタイプのエンコーディングバイトが含まれているため、次の値をデコードできません。これらの余分なバイトが最初にあると、JSONのシリアル化が失敗します。

メッセージ本文の相互運用性を実現するには、アプリケーションで、パブリッシャとコンシューマがその型とエンコーディングに同意する必要があります。この場合、パブリッシャはAMQPデータメッセージで生のバイトを送信する必要があります。

message.body = msg.encode('utf-8') 

もう1つの回避策は、アプリケーションのプロパティで単純な型(文字列など)を送信することです。

他の人もこの問題に遭遇しました。 https://github.com/Azure/amqpnetlite/issues/117

0

@ XinChen氏によると、この問題はAMQPプロトコルによって発生しています。

私の経験上、以下の2つの回避策がこの場合に有効です。

  1. AMQPを持つ代わりに、紺碧のPython SDKのSend Event REST APIを使用しますが、残りのAPIは、高性能ではないHTTPプロトコルに基づいています。
  2. JSONメッセージをBase64エンコーディングで送信し、受信したメッセージをJSON文字列にデコードします。
0

これら二つのことは、私の仕事:

  • は必ずご指定はあなたの例のようにエンコーディングencoding='utf-8'ないencoding='utf8'ダンプ作るためにmessage.inferred = True
  • チェックを追加します。

更新OP:

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf-8') 
message.body = msg 
message.inferred = True 
messenger.put(message) 
messenger.send() 

推測されたフラグを追加することで、私はメッセージシリアライザが適切に体がバイトであることを推測するため、@Xin Chenのポイントに取り組む、AMPQデータを作成することができると思います。

メッセージの推定フラグは、メッセージの内容がどのようにAMQPセクションにエンコードされているかを示します。推論が真の場合、メッセージ本体のバイナリとリストの値は、それぞれAMQP DATAセクションとAMQP SEQUENCEセクションとしてエンコードされます。推定値がfalseの場合、メッセージ本文のすべての値は、型に関係なくAMQP VALUEセクションとしてエンコードされます。

再:Qpid Proton Docs #inferred

再:JSON Encoder and Decoder #dumps

関連する問題