2017-01-20 4 views
1

私はInMemNetworkを使ってテストを実行しています。Rebus - ユニットテストを実行し、Rebusがすべてのスレッドを完了するのを待ちます

アサーションの直前にThread.Sleepコールがありますが、これはテストの厄介な方法であり、テストが大幅に遅くなります。

私は、SagaFixturesと、同期して動作するシンプルなIBusの実装を使用していくつかの統合テストを行っていますが、ハンドラの登録、ハンドラの実行、メッセージの遅延などが少し面倒です。

Rebusが使用しているすべてのスレッドが、ManualResetEvent(Rebus独自のテストで使用される)を使用してプロダクションコードを増強せずに実行を終了するまで待機する方法はありますか?

答えて

0

私は通常、SagaFixtureを使用してから、FakeBusを使用して、アクションをキャプチャするためにsagaハンドラに挿入します。

私のテストのほとんどは、単純なハンドラの単体テストですが、私はしばしば、「本当の」サービスを注入します。実際のデータベースに移動するIThisIThatの実装。

複数のエンドポイントをin-memトランスポートでスピンアップしても、通常はInMemNetworkの拡張機能を実装すると、特定のイベントが公開されるのを待つのに役立ちます。試験では、この:WaitForNextは単に待ち行列が次のメッセージをsubscriberAddressによって指定さWhateverUpdatedとして逆シリアル化しようとポーリング拡張メソッドである

var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20); 

私はあなたにいくつかのインスピレーションを与えることができます:)

+0

のようである可能性があります。どうもありがとう! –

0

いくつかのシナリオの私は、すべてのメッセージの処理を完了するために、REBUSを待つために、次のアプローチを使用するために願っています。リバースエンドポイントは別々のexeでホストされ、rebusファイルシステムトランスポートは統合テストに使用されます(通常Azure SBです)。統合テストはexeファイルをスピンアップし、各exeファイルにRebusは0人の従業員で構成されているので、何もしません。次に、テストでは、処理するメッセージがなくなるまで、ワーカーとブロックの数を設定するWaitForMessagesProcessed()メソッドがあります。ここで

は、それが大体のコードでどのように見えるかです:これは完璧な解決策になるため

public class MessageProcessor() { 
    private string queueName; 
    private int messagesWaitingForProcessing; 
    private IBus bus; 

    public MessageProcessor(string queueName) { 
     this.queueName = queueName; 

     this.bus = Configure.With(adapter) 
     .Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName)) 
     .Options(o => 
     { 
      o.SetNumberOfWorkers(0); 
     }) 
     .Events(e => 
     { 
      e.BeforeMessageSent += (thebus, headers, message, context) => 
      { 
       // When sending to itself, the message is not queued on the network. 
       var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>(); 
       if (m.Any(t => t == this.queueName)) 
        this.messagesWaitingForProcessing++; 
      }; 

      e.AfterMessageHandled += (thebus, headers, message, context, args) => 
      { 
       this.messagesWaitingForProcessing--; 
      }; 
     }) 
     .Start(); 
    } 

    public async Task WaitForMessagesProcessed() 
    { 
     this.DetermineMessagesWaitingForProcessing(); 
     while (this.messagesWaitingForProcessing > 0) 
     { 
      this.bus.Advanced.Workers.SetNumberOfWorkers(2); 

      while (this.messagesWaitingForProcessing > 0) 
      { 
       await Task.Delay(100); 
      } 

      this.bus.Advanced.Workers.SetNumberOfWorkers(0); 

      this.DetermineMessagesWaitingForProcessing(); 
     } 
    } 

    public void DetermineMessagesWaitingForProcessing() { 
     this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName),  "*.rebusmessage.json").Count(); 
    } 

    private static string GetDirectoryForQueueNamed(string queueName) 
    { 
     return Path.Combine(this.baseDiretory, queueName); 
    } 
} 

テストでは、拡張メソッドを作成

[TestMethod] 
public void Test() { 
    var endpoint1 = LaunchExe("1"); 
    var endpoint2 = LaunchExe("2"); 

    endPoint1.DoSomeAction(); 

    endPoint1.WaitForMessagesProcessed(); 

    Assert.AreEqual("expectation", endPoint1.Query()); 
} 
関連する問題