2016-03-25 13 views
1

Microsoft Azureサービスバスを使用してコマンドメッセージを送信しており、OnMessageアプローチを使用してバスから取得しています。 IMessageSessionAsyncHandler.OnMessageAsyncの実装では、メッセージの処理準備ができていないことがわかります。だから私はバスからメッセージを受け取り、キューの最後まで読んでみたい。 get/readd操作はアトミックでなければなりません。どうすればこれを達成できますか?Azureサービスバス - キューの最後までのOnMessageAsyncでのReadメッセージ

私の現在の解決策(高度に抽象化されている)ですが、非アトミック性を恐れています。

queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...); 

internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory 
{ 
    private readonly QueueClient _queueClient; 
    public CommandSessionHandlerFactory(QueueClient queueClient) 
    { 
     _queueClient = queueClient; 
    } 

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message) 
    { 
     return new CommandSessionHandlerAsync(_queueClient); 
    } 
} 

internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler 
{ 
    private readonly QueueClient _queueClient; 
    public CommandSessionHandlerAsync(QueueClient queueClient) 
    { 
     _queueClient = queueClient; 
    } 

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message) 
    { 
     if (!messageReadyForProcessing) 
     { 
      // How to get the following code transactional safe? 
      var clonedMessage = message.Clone(); 
      await message.CompleteAsync(); 
      await _queueClient.SendAsync(clonedMessage); 
     } 
    } 
} 

そして、団体検出はどうですか?複製されたメッセージのMessageIdを変更して、サービスバスの重複検出によってクローンメッセージが破棄されないようにする必要がありますか?

答えて

1

そして、団体検出はどうですか?複製されたメッセージのMessageId を変更して、サービスバス重複 の検出でクローンメッセージが破棄されないようにする必要がありますか?

重複検出が有効な場合。あなたのメッセージはキューから削除されます。あなたが同じMessageIdを使用しており、検出時間が重複している場合はmessageIdを使用しているため(デフォルトは10分)

 if (!messageReadyForProcessing) 
     { 
      // How to get the following code transactional safe? 
      var clonedMessage = message.Clone(); 
      await message.CompleteAsync(); 
      await _queueClient.SendAsync(clonedMessage); 
     } 

あなたはこれを変更することができます。

  if (!messageReadyForProcessing) 
      { 
       await message.CompleteAsync(); 
       message.MessageId = Guid.NewGuid().ToString(); 
       await _queueClient.SendAsync(message); 
      } 

しかし、まだ問題があります。メッセージは正常に完了しましたが、メッセージの送信に失敗した場合はどうなりますか?あなたはメッセージを失うでしょう。再試行ポリシーでこれを解決できます。しかしそれは汚れており、100%保証されていません。

あなたはMax DeliveryCountを増やすことができます。もう一度メッセージを送信する以外に、もう一度使用するには放棄(放棄)してください。

 if (!messageReadyForProcessing) 
      { 
       await message.AbandonAsyncy(); 

      } 

これは優れています。あなたは確かに、それはトランザクションです。最大配信数を超えても、デッドキューに入ります。

しかし、まだ欠点があります。本当にポジティブなメッセージはどうなりますか?したがって、決して処理されないメッセージがあれば、それはあなたの消費者を占有するでしょう。最大配信数が増えたためです。

簡潔に言えば、OnMessageはあなたのソリューションには適していません。あなたがそれを処理する準備ができたら、キューからメッセージを取得するだけです。私はこれがあなたにとって最も適した解決策だと思います。

if (messageReadyForProcessing) 
    { 
     var mySession=QueueClient.AcceptMessageSession(); 
      var message = mySession.Receive(); 
    } 

編集:

あなたはTransactionScope with service busを使用することができます。スコープ内の同じキューで作業する必要があります。

Ps:トランザクションスコープは放棄をサポートしていません。

だから、これを適用することができます。

if (!messageReadyForProcessing) 
     { 
     using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) 
      { 
        await message.CompleteAsync(); 
        message.MessageId = Guid.NewGuid().ToString(); 
        await _queueClient.SendAsync(message); 
      scope.Complete();     
     } 
    } 

チェックBrokered Messaging: Transactions

私はこれがasyncsで動作するか分かりません。したがって成功しない場合は、Asynchronous Transactions with Service Busをテストして確認することができます。

Ps:彼らは、トランザクションスコープと非同期についてネットフレームワークにバグがあると言います。彼らは4.5.1以上を使用することをお勧めします。さらにhereを確認してください。

+0

1)メッセージXの前に処理する必要がある他のメッセージがあるので、メッセージXをキューの最後に置く必要があるため、放棄は適切ではありません。2)messageReadyForProcessingはセッションに基づく決定です。したがって、あなたの最後の提案は残念ながら、私たちがすでにセッションに入っているので私たちが行く方法ではありません。 –

+0

私は、終わりまたは始まりのキューに放棄した後に何が起こるか、鋭い答えを見つけることができないことがわかります。しかし、私はあなたがトランザクションスコープを使用できる私の答えを編集しました。 –

+1

ありがとうございました。私はTransactionScopeについて読んだことがありますが、私たちの場合にはうまくいきませんでした。 "async"に関する疑問について:.NET Framework 4.5.1以降はうまくいくようです。 TransactionScopeの新しいコンストラクタ、TransactionScopeAsyncFlowOptionがあります:http://stackoverflow.com/questions/13543254/get-transactionscope-to-work-with-async-await/17527759#17527759あなたの例にパラメータを追加します。 –

関連する問題