2009-03-15 8 views
5

私は最近、Java Webアプリケーションが別のサーバー上に存在するレガシーVB6アプリケーションと通信するための努力の一環として、C#で素早く汚れた概念証明プロキシサーバーを作成しました。C#の非同期ネットワークコードのパターンはよく知られていますか?

プロキシサーバーとクライアントはどちらも同じメッセージ形式を使用します。コードに私は、サーバーによって生成され、クライアントと応答の両方からの要求を表現するためにProxyMessageクラスを使用します。

public class ProxyMessage 
{ 
    int Length; // message length (not including the length bytes themselves) 
    string Body; // an XML string containing a request/response 

    // writes this message instance in the proper network format to stream 
    // (helper for response messages) 
    WriteToStream(Stream stream) { ... } 
} 

メッセージは以下のようになりほど単純です:ボディ+メッセージ本体の長さ。

クライアントへの接続を表す別のProxyClientクラスがあります。これは、プロキシと単一のクライアント間のすべての対話を処理します。

私は、非同期ソケットプログラミングに関連するボイラープレートコードを簡略化するためのデザインパターンまたはベストプラクティスであると思いますか?たとえば、誤ってバイトを失うことがないように、現在のメッセージの処理にどのくらいの距離があるかを把握する必要があるように、読み込みバッファを管理するために注意する必要があります。現在のコードでは、TcpClient.BeginReadのコールバック関数でこの作業をすべて行い、いくつかのインスタンス変数を使用してバッファの状態と現在のメッセージ処理状態を管理しています。

BeginReadに渡す私のコールバック関数のコードは、コンテキストの関連するインスタンス変数と共に以下のとおりです。コードは "そのまま"うまく動作しているようですが、それを明確にするためにリファクタリングできるかどうかは疑問です。

private enum BufferStates 
{ 
    GetMessageLength, 
    GetMessageBody 
} 
// The read buffer. Initially 4 bytes because we are initially 
// waiting to receive the message length (a 32-bit int) from the client 
// on first connecting. By constraining the buffer length to exactly 4 bytes, 
// we make the buffer management a bit simpler, because 
// we don't have to worry about cases where the buffer might contain 
// the message length plus a few bytes of the message body. 
// Additional bytes will simply be buffered by the OS until we request them. 
byte[] _buffer = new byte[4]; 

// A count of how many bytes read so far in a particular BufferState. 
int _totalBytesRead = 0; 

// The state of the our buffer processing. Initially, we want 
// to read in the message length, as it's the first thing 
// a client will send 
BufferStates _bufferState = BufferStates.GetMessageLength; 

// ...ADDITIONAL CODE OMITTED FOR BREVITY... 

// This is called every time we receive data from 
// the client. 

private void ReadCallback(IAsyncResult ar) 
{ 
    try 
    { 
     int bytesRead = _tcpClient.GetStream().EndRead(ar); 

     if (bytesRead == 0) 
     { 
      // No more data/socket was closed. 
      this.Dispose(); 
      return; 
     } 

     // The state passed to BeginRead is used to hold a ProxyMessage 
     // instance that we use to build to up the message 
     // as it arrives. 
     ProxyMessage message = (ProxyMessage)ar.AsyncState; 

     if(message == null) 
      message = new ProxyMessage(); 

     switch (_bufferState) 
     { 
      case BufferStates.GetMessageLength: 

       _totalBytesRead += bytesRead; 

       // if we have the message length (a 32-bit int) 
       // read it in from the buffer, grow the buffer 
       // to fit the incoming message, and change 
       // state so that the next read will start appending 
       // bytes to the message body 

       if (_totalBytesRead == 4) 
       { 
        int length = BitConverter.ToInt32(_buffer, 0); 
        message.Length = length; 
        _totalBytesRead = 0; 
        _buffer = new byte[message.Length]; 
        _bufferState = BufferStates.GetMessageBody; 
       } 

       break; 

      case BufferStates.GetMessageBody: 

       string bodySegment = Encoding.ASCII.GetString(_buffer, _totalBytesRead, bytesRead); 
       _totalBytesRead += bytesRead; 

       message.Body += bodySegment; 

       if (_totalBytesRead >= message.Length) 
       { 
        // Got a complete message. 
        // Notify anyone interested. 

        // Pass a response ProxyMessage object to 
        // with the event so that receivers of OnReceiveMessage 
        // can send a response back to the client after processing 
        // the request. 
        ProxyMessage response = new ProxyMessage(); 
        OnReceiveMessage(this, new ProxyMessageEventArgs(message, response)); 
        // Send the response to the client 
        response.WriteToStream(_tcpClient.GetStream()); 

        // Re-initialize our state so that we're 
        // ready to receive additional requests... 
        message = new ProxyMessage(); 
        _totalBytesRead = 0; 
        _buffer = new byte[4]; //message length is 32-bit int (4 bytes) 
        _bufferState = BufferStates.GetMessageLength; 
       } 

       break; 
     } 

     // Wait for more data... 
     _tcpClient.GetStream().BeginRead(_buffer, 0, _buffer.Length, this.ReadCallback, message); 
    } 
    catch 
    { 
     // do nothing 
    } 

} 

これまでのところ、私の唯一の本当の考えは別のMessageBufferクラスにバッファに関連するものを抽出し、単に彼らが到着すると、私の読み込みコールバックがそれに新しいバイトを追加することです。 MessageBufferは、現在のBufferStateのようなものを心配し、完全なメッセージを受信したときにイベントを発生させます。ProxyClientは、リクエストを処理できるメインのプロキシサーバーコードにさらに伝播する可能性があります。

+0

あなたはこれを利用できるように開発したもののオープンソース版を持っていないでしょうか? – Maslow

答えて

2

私は同様の問題を克服しなければなりませんでした。ここに私の解決策があります(独自の例に合わせて修正されています)。

(スーパークラスNetworkStream、これはスーパークラスTcpClientなど)の周りにラッパーを作成します。読み取りを監視します。いくつかのデータが読み込まれると、それはバッファされます。長さインジケータ(4バイト)を受け取ると、フルメッセージ(4バイト+メッセージ本文の長さ)があるかどうかを確認します。これを行うと、メッセージ本文でMessageReceivedイベントが発生し、メッセージをバッファから削除します。この技術は、断片化されたメッセージと複数のメッセージ/パケットの状況を自動的に処理します。

public class MessageStream : IMessageStream, IDisposable 
{ 
    public MessageStream(Stream stream) 
    { 
     if(stream == null) 
      throw new ArgumentNullException("stream", "Stream must not be null"); 

     if(!stream.CanWrite || !stream.CanRead) 
      throw new ArgumentException("Stream must be readable and writable", "stream"); 

     this.Stream = stream; 
     this.readBuffer = new byte[512]; 
     messageBuffer = new List<byte>(); 
     stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null); 
    } 

    // These belong to the ReadCallback thread only. 
    private byte[] readBuffer; 
    private List<byte> messageBuffer; 

    private void ReadCallback(IAsyncResult result) 
    { 
     int bytesRead = Stream.EndRead(result); 
     messageBuffer.AddRange(readBuffer.Take(bytesRead)); 

     if(messageBuffer.Count >= 4) 
     { 
      int length = BitConverter.ToInt32(messageBuffer.Take(4).ToArray(), 0); // 4 bytes per int32 

      // Keep buffering until we get a full message. 

      if(messageBuffer.Count >= length + 4) 
      { 
       messageBuffer.Skip(4); 
       OnMessageReceived(new MessageEventArgs(messageBuffer.Take(length))); 
       messageBuffer.Skip(length); 
      } 
     } 

     // FIXME below is kinda hacky (I don't know the proper way of doing things...) 

     // Don't bother reading again. We don't have stream access. 
     if(disposed) 
      return; 

     try 
     { 
      Stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null); 
     } 
     catch(ObjectDisposedException) 
     { 
      // DO NOTHING 
      // Ends read loop. 
     } 
    } 

    public Stream Stream 
    { 
     get; 
     private set; 
    } 

    public event EventHandler<MessageEventArgs> MessageReceived; 

    protected virtual void OnMessageReceived(MessageEventArgs e) 
    { 
     var messageReceived = MessageReceived; 

     if(messageReceived != null) 
      messageReceived(this, e); 
    } 

    public virtual void SendMessage(Message message) 
    { 
     // Have fun ... 
    } 

    // Dispose stuff here 
} 
+1

私はここですべての答えが好きだったので、他の人は+1しましたが、結局私はこの答えに非常に似ていました。それは簡単で分かりやすいので、今から数ヶ月後に何をやっていたのかを覚えています; –

1

私が使っているデザインは大丈夫だと思いますが、これはおおまかなことですが、同じようなことをやったことがあります。私はあなたが追加のクラス/構造体にリファクタリングすることによって多くを得るとは思っていません。

私が持っている唯一のコメントは、最初のものが常にメッセジーの長さであり、2番目のものが常にボディであるものが十分に頑丈であるかどうかということです。私は、そうでないような状況(例えば、間違った長さを送信するなど)によって何らかの理由で同期が外れてしまったように、そのようなアプローチには常に気をつけています。代わりに、私は大きなバッファで1回の読み込みを行い、ネットワークから利用可能なすべてのデータを取得し、完全なメッセージを抽出するためにバッファを検査します。そうすれば、現在のバッファが破棄され、クリーンな状態に戻って、サービス全体を停止するのではなく、現在のメッセージだけが失われてしまいます。

実際には、メッセージ本文が大きく、2つの別々の受信に到着し、次のメッセージが送信された場合、前の本文の後半と同じ長さのメッセージが送信されます。それが起こった場合、あなたのメッセージの長さは前のメッセージの本文に追加され、前の段落で説明されていない状況になってしまいます。

+0

Hmm。私は間違った長さを送っているクライアントが、コードが書かれている方法ではっきりと物事を混乱させることに同意し、私はそれをどう扱うのが最善かを議論しました。私は、クライアントが正しくフォーマットされたメッセージを送信していない場合、それほど気にしません。ゴミ出し、ゴミ出し;-) –

+0

あなたの最後の段落に良いキャッチ。私のコードをもう一度見て、あなたは2つのメッセージが重複している可能性があると思います。アイロニーは、最初に問題を回避するためにバッファのサイズを変更していたが、大きなメッセージがネットワークによって分割された場合にどうなるかについては考えなかった。 Oops –

+0

実際には、それについてもう少し考えた後、私のアプローチはまだ動作していると思います。 OSレベルのソケットバッファには、1つのメッセージの終わりと次のメッセージの始まりを含めることができます。しかし、BeginRead/EndReadは、BeginReadに渡されたバッファ内の... –

1

あなたは非同期コールバック用のステートマシンの生成を自動化するためにyield returnを使用することができます。ジェフリー・リヒターはこのテクニックを彼のAsyncEnumeratorクラスで宣伝しています。私はそのアイデアで遊んだことがありますhere

1

あなたのやり方に間違いはありません。私にとっては、データの受信を処理から切り離すことが好きです。これは、あなたが提案したMessageBufferクラスで考えているようです。私はそのことを詳細にhereについて議論しました。

+0

実際にメッセージ処理ロジックから受信を分離したいと思っています。 –

+0

私の質問では言及しなかったのは、プロキシサーバーは最終的には異なるプロトコルを処理しなければならないということでした(それは基本的に共通のプロキシメッセージ形式でメッセージをトンネリングします)ので、別のロジックを送信する良い方法を考えていたと思いますトンネリングされるプロトコルに基づいて行われる。 –

関連する問題