2010-12-29 35 views
14

キューを処理する最善の方法を理解しようとしています。私はDataTableを返すプロセスを持っています。各DataTableは、前のDataTableとマージされます。 1つの問題があります。最後のBulkCopy(OutOfMemory)まで保持するレコードが多すぎます。ConcurrentQueueを使用してスレッドを処理する方法<T>

したがって、私は各着信DataTableを直ちに処理する必要があると判断しました。 ConcurrentQueue<T>について考えると、WriteQueuedData()メソッドがテーブルをデキューしてデータベースに書き込む方法を知ることができません。例えば

public class TableTransporter 
{ 
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); 

    public TableTransporter() 
    { 
     tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available 
    } 

    public void ExtractData() 
    { 
     DataTable table; 

     // perform data extraction 
     tableQueue.Enqueue(table); 
    } 

    private void WriteQueuedData(object sender, EventArgs e) 
    { 
     BulkCopy(e.Table); 
    } 
} 

私の最初の質問はさておき、私はこれは私が必要とするすべてのだろう非同期ExtractData()を呼び出す場合、私は実際に加入するすべてのイベントを持っていないという事実から、でしょうか?第二に、私は方法については不明な何かがありますConcurrentQueue<T>機能とキューに入れられたオブジェクトと非同期に動作するトリガーのいくつかのフォームが必要ですか?

更新 私はちょうどOnItemQueuedイベントハンドラを持っているConcurrentQueue<T>からクラスを派生してきました。次に:

new public void Enqueue (DataTable Table) 
{ 
    base.Enqueue(Table); 
    OnTableQueued(new TableQueuedEventArgs(Table)); 
} 

public void OnTableQueued(TableQueuedEventArgs table) 
{ 
    EventHandler<TableQueuedEventArgs> handler = TableQueued; 

    if (handler != null) 
    { 
     handler(this, table); 
    } 
} 

この実装についての懸念はありますか?

答えて

18

問題を理解してから、いくつか不足しています。

同時キューは、データ構造を明示的にロックする必要なく、複数のスレッドがキューに読み書きできるように設計されたデータ構造です。 (ジャズは舞台裏で世話をしているか、ロックを取る必要がないようにコレクションが実装されています)

これを念頭に置きながら、試しているパターンのように見えます使用するのは "生産/消費者"です。まず、作業を生成するタスク(およびキューに項目を追加するタスク)があります。次に、2番目のタスクがあります。キューからアイテムを消費します(アイテムをデキューします)。

本当に2つのスレッドが必要です:1つはアイテムを追加し、もう1つはアイテムを削除します。並行コレクションを使用しているため、アイテムを追加する複数のスレッドとアイテムを削除する複数のスレッドを持つことができます。しかし、明らかに同時キューの競合が増えるほど、より速くボトルネックになります。

+0

私は2つのスレッドがあると思いました。主スレッドは、基本的にイベントがトリガーするのを待ちます。 2番目のスレッドは 'ExtractData()'への非同期呼び出しとして開始します。非同期コールバックでは、単に抽出プロセスを続行します。 – IAbstract

+0

実際、私はそれを後方に持っていると思います。メインスレッドはキューイングしているデータテーブルでなければなりません。エンキューされた項目イベントトリガーを介して非同期書き込みメソッドを開始します。 – IAbstract

3

これは私が思い付いた何のための完全なソリューションです:概念実証として

public class TableTransporter 
{ 
    private static int _indexer; 

    private CustomQueue tableQueue = new CustomQueue(); 
    private Func<DataTable, String> RunPostProcess; 
    private string filename; 

    public TableTransporter() 
    { 
     RunPostProcess = new Func<DataTable, String>(SerializeTable); 
     tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); 
    } 

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) 
    { 
     // do something with table 
     // I can't figure out is how to pass custom object in 3rd parameter 
     RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); 
    } 

    public void ExtractData() 
    { 
     // perform data extraction 
     tableQueue.Enqueue(MakeTable()); 
     Console.WriteLine("Table count [{0}]", tableQueue.Count); 
    } 

    private DataTable MakeTable() 
    { return new DataTable(String.Format("Table{0}", _indexer++)); } 

    private string SerializeTable(DataTable Table) 
    { 
     string file = Table.TableName + ".xml"; 

     DataSet dataSet = new DataSet(Table.TableName); 

     dataSet.Tables.Add(Table); 

     Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); 
     string xmlstream = String.Empty; 

     using (MemoryStream memstream = new MemoryStream()) 
     { 
      XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); 
      XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); 

      xmlSerializer.Serialize(xmlWriter, dataSet); 
      xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); 

      using (var fileStream = new FileStream(file, FileMode.Create)) 
       fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); 
     } 
     filename = file; 

     return file; 
    } 

    private void PostComplete(IAsyncResult iasResult) 
    { 
     string file = (string)iasResult.AsyncState; 
     Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); 

     RunPostProcess.EndInvoke(iasResult); 
    } 

    public static String UTF8ByteArrayToString(Byte[] ArrBytes) 
    { return new UTF8Encoding().GetString(ArrBytes); } 

    public static Byte[] StringToUTF8ByteArray(String XmlString) 
    { return new UTF8Encoding().GetBytes(XmlString); } 
} 

public sealed class CustomQueue : ConcurrentQueue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 

    public CustomQueue() 
    { } 
    public CustomQueue(IEnumerable<DataTable> TableCollection) 
     : base(TableCollection) 
    { } 

    new public void Enqueue (DataTable Table) 
    { 
     base.Enqueue(Table); 
     OnTableQueued(new TableQueuedEventArgs(Table)); 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

public class TableQueuedEventArgs : EventArgs 
{ 
    #region Fields 
    #endregion 

    #region Init 
    public TableQueuedEventArgs(DataTable Table) 
    {this.Table = Table;} 
    #endregion 

    #region Functions 
    #endregion 

    #region Properties 
    public DataTable Table 
    {get;set;} 
    #endregion 
} 

、かなりうまく動作するようです。たいてい私は4つのワーカースレッドを見た。

+0

TODO:新しい非同期メソッドで更新します。 – IAbstract

+0

これを見ると、良い実装ですが、クイックテストを実行したとき、アイテムがデキューされたときはいつですか? –

+0

@RichardPriddy:これはわずか5年前(*と私は3代目の会社*に移ってからずっと前です)以来、これは完全な例ではないと思います。最後に、*概念の証明*発言に注意してください。 ;)つまり、要件に応じて 'enqueued'イベントを公開し、他のものがデキューを処理できるようにすることができます。さもなければ、ポストプロセス関数の 'AsyncCallback'のどこかでデキューすることが論理的かもしれません。この後半に、より具体的なものを特定するのは本当に難しいでしょう。 – IAbstract

8

私は、ConcurrentQueueは非常にほんのわずかの場合にのみ有用だと思います。その主な利点は、ロックフリーです。しかし、通常、プロデューサスレッドは、処理可能なデータが何らかの形で消費者スレッドに通知されなければならない。スレッド間のこのシグナリングはロックを必要とし、ConcurrentQueueを使用する利点を無効にします。スレッドを同期させる最速の方法は、ロック内でのみ動作するMonitor.Pulse()を使用することです。他のすべての同期ツールはさらに低速です。

もちろん、消費者は待ち行列に何かが存在するかどうかを絶えず確認することができます。これはロックなしで動作しますが、プロセッサリソースは膨大な量です。消費者がチェックの間に待っていれば少し良いです。

キューに書き込むときにスレッドを発生させることは非常に悪い考えです。 ConcurrentQueueを使用して1ミリ秒を節約すると、イベントハンドラを実行すると1000ミリ秒かかることがあります。

すべての処理がイベントハンドラまたは非同期呼び出しで行われる場合、なぜキューが必要なのかという疑問があります。データを直接ハンドラに渡し、キューをまったく使用しないこと。

ConcurrentQueueの実装は、並行性を可能にするにはかなり複雑です。ほとんどの場合、通常のキュー<>を使用し、キューへのアクセスをすべてロックします。待ち行列のアクセスにはマイクロ秒しか必要ないので、2つのスレッドが同じマイクロ秒で待ち行列にアクセスすることはほとんどなく、ロックのために遅延がほとんどありません。ロックを伴う通常のキュー<を使用すると、ConcurrentQueueよりもコード実行が速くなることがよくあります。

+0

賛成投票を受けるのは恥ずかしいです。私はそれが有効で実用的な意見だと思う。 – user3085342

関連する問題