2011-08-20 9 views
5

私はHTTPハンドラを持っていて、すべての要求をメモリ内の並行キューコレクションに格納しています。特定の時間の後、コレクションをデータベースに一括して挿入します。メモリにデータを保持する

これは悪い考えですか?ボリュームが高いので、これはより良いアプローチのIMOであるようです。

並行しているコレクションをフラッシュしている間に、スレッドをロックしてロックし、その内容を一括して挿入してからコレクションを空にしている間に、いくつかの不一致(データベースのヒット数と格納された要素の数の比較)があります。コレクションからロックを削除します。

より良い方法はありますか?またはあなたは似たようなことをしましたか?

+0

なぜあなたはこれをしたいですか? – Marc

+0

私は一度に1つの挿入またはトランザクションを行う場合、データアクセスが遅いため。 – DarthVader

+0

あなたが直面しているスレッドの問題については、元のコレクションを別のコレクション(newCollection)にコピーし、元のコレクションのロックをクリアして削除し、newCollectionを使用してデータベースに挿入してください。このアプローチでは、新しいリクエストは長時間ブロックされませんでした。 –

答えて

1

私は以下のコードで説明したのと全く同じことを行っています。スレッドセーフで、フラッシュや保留中の書き込みに呼び出すことができるフラッシュメソッドがあります。スレッドが書き込むスレッシュホールド数に達すると、キュー(リストは私の場合)を別のスレッドに送信して保存します。これは、最後にデータをフラッシュするためにmanualResetEventを使用することに注意してください(64個のリセットイベントがありますので、64個以上のバックグラウンドスレッドが書き込みを保留している場合は手動で待ちますが、あなたのデータベースが本当に遅い場合を除きほとんど起こりません)。このコードは、メモリからストリーミングされた数千万のレコードを処理するために使用されました(メモリーから20m行を書き込むのに約5分かかりましたが、データベースとして保存サーバー上で実行されていました。 (つまり、BulkSqlCopyオブジェクトとIDataReaderを使用して1秒間に1行ずつ)、リクエストロードを処理する必要があります(もちろん、あなたが書いているものとデータベースに依存しますが、コードはタスクに依存します)。

また、ファシリティ一括書き込みには、データをストリームするIDataReaderの最小限の実装を作成します。以下のコードを使用するためには、必要な処理が必要です。

public class DataImporter<T> 
{ 

    public DataImporter(string tableName, string readerName) 
    { 
     _tableName = tableName; 
     _readerName = readerName; 
    } 

    /// <summary> 
    /// This is the size of our bulk staging list. 
    /// </summary> 
    /// <remarks> 
    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value, 
    /// so records may not be going into the database in sizes of this staging value. 
    /// </remarks> 
    private int _bulkStagingListSize = 20000; 
    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>(); 
    private string _tableName = String.Empty; 
    private string _readerName = String.Empty; 

    public void QueueForImport(T record) 
    { 
     lock (_listLock) 
     { 
      _items.Add(record); 
      if (_items.Count > _bulkStagingListSize) 
      { 
       SaveItems(_items); 
       _items = new List<T>(); 
      } 
     } 
    } 

    /// <summary> 
    /// This method should be called at the end of the queueing work to ensure to clear down our list 
    /// </summary> 
    public void Flush() 
    { 
     lock (_listLock) 
     { 
      SaveItems(_items); 
      _items = new List<T>(); 
      while (_tasksWaiting.Count > 64) 
      { 
       Thread.Sleep(2000); 
      } 
      WaitHandle.WaitAll(_tasksWaiting.ToArray()); 
     } 
    } 

    private void SaveItems(List<T> items) 
    { 
     ManualResetEvent evt = new ManualResetEvent(false); 
     _tasksWaiting.Add(evt); 
     IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items); 
     Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo); 

    } 

    private void saveData(object info) 
    { 
     using (new ActivityTimer("Saving bulk data to " + _tableName)) 
     { 
      Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>; 
      IDataReader r = stateInfo.Item2; 
      try 
      { 
       Database.DataImportStagingDatabase.BulkLoadData(r, _tableName); 
      } 
      catch (Exception ex) 
      { 
       //Do something 
      } 
      finally 
      { 
       _tasksWaiting.Remove(stateInfo.Item1); 
       stateInfo.Item1.Set(); 
      } 
     } 
    } 

    private object _listLock = new object(); 

    private List<T> _items = new List<T>(); 
} 

DataReaderFactoryはちょうどストリーミングに使用する権利のIDataReader implmentationを選択し、以下のように見えるの下に参照さ:

internal static class DataReaderFactory 
{ 
    internal static IDataReader GetReader<T>(string typeName, List<T> items) 
    { 
     IDataReader reader = null; 
     switch(typeName) 
     { 
      case "ProductRecordDataReader": 
       reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader; 
       break; 
      case "RetailerPriceRecordDataReader": 
       reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader; 
       break; 
      default: 
       break; 
     } 
     return reader; 
    } 
} 

私は(althoghtは、このコードは動作します。この場合に使用されるデータリーダー実装)任意のデータ読取装置と以下の通りである:次のように

/// <summary> 
/// This class creates a data reader for ProductRecord data. This is used to stream the records 
/// to the SqlBulkCopy object. 
/// </summary> 
public class ProductRecordDataReader:IDataReader 
{ 
    public ProductRecordDataReader(List<ProductRecord> products) 
    { 
     _products = products.ToList(); 
    } 

    List<ProductRecord> _products; 

    int currentRow; 
    int rowCounter = 0; 
    public int FieldCount 
    { 
     get 
     { 
      return 14; 
     } 
    } 


    #region IDataReader Members 

    public void Close() 
    { 
     //Do nothing. 
    } 

    public bool Read() 
    { 
     if (rowCounter < _products.Count) 
     { 
      currentRow = rowCounter; 
      rowCounter++; 
      return true; 
     } 
     else 
     { 
      return false; 
     } 

    } 

    public int RecordsAffected 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public string GetName(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return "ProductSku"; 
      case 1: 
       return "UPC"; 
      case 2: 
       return "EAN"; 
      case 3: 
       return "ISBN"; 
      case 4: 
       return "ProductName"; 
      case 5: 
       return "ShortDescription"; 
      case 6: 
       return "LongDescription"; 
      case 7: 
       return "DFFCategoryNumber"; 
      case 8: 
       return "DFFManufacturerNumber"; 
      case 9: 
       return "ManufacturerPartNumber"; 
      case 10: 
       return "ManufacturerModelNumber"; 
      case 11: 
       return "ProductImageUrl"; 
      case 12: 
       return "LowestPrice"; 
      case 13: 
       return "HighestPrice"; 
      default: 
       return null; 
     } 

    } 

    public int GetOrdinal(string name) 
    { 
     switch (name) 
     { 
      case "ProductSku": 
       return 0; 
      case "UPC": 
       return 1; 
      case "EAN": 
       return 2; 
      case "ISBN": 
       return 3; 
      case "ProductName": 
       return 4; 
      case "ShortDescription": 
       return 5; 
      case "LongDescription": 
       return 6; 
      case "DFFCategoryNumber": 
       return 7; 
      case "DFFManufacturerNumber": 
       return 8; 
      case "ManufacturerPartNumber": 
       return 9; 
      case "ManufacturerModelNumber": 
       return 10; 
      case "ProductImageUrl": 
       return 11; 
      case "LowestPrice": 
       return 12; 
      case "HighestPrice": 
       return 13; 
      default: 
       return -1; 
     } 

    } 

    public object GetValue(int i) 
    { 
     switch (i) 
     { 
      case 0: 
       return _products[currentRow].ProductSku; 
      case 1: 
       return _products[currentRow].UPC; 
      case 2: 
       return _products[currentRow].EAN; 
      case 3: 
       return _products[currentRow].ISBN; 
      case 4: 
       return _products[currentRow].ProductName; 
      case 5: 
       return _products[currentRow].ShortDescription; 
      case 6: 
       return _products[currentRow].LongDescription; 
      case 7: 
       return _products[currentRow].DFFCategoryNumber; 
      case 8: 
       return _products[currentRow].DFFManufacturerNumber; 
      case 9: 
       return _products[currentRow].ManufacturerPartNumber; 
      case 10: 
       return _products[currentRow].ManufacturerModelNumber; 
      case 11: 
       return _products[currentRow].ProductImageUrl; 
      case 12: 
       return _products[currentRow].LowestPrice; 
      case 13: 
       return _products[currentRow].HighestPrice; 
      default: 
       return null; 
     } 

    } 

    #endregion 

    #region IDisposable Members 

    public void Dispose() 
    { 
     //Do nothing; 
    } 

    #endregion 

    #region IDataRecord Members 

    public bool NextResult() 
    { 
     throw new NotImplementedException(); 
    } 

    public int Depth 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public DataTable GetSchemaTable() 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsClosed 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public bool GetBoolean(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public byte GetByte(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public char GetChar(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) 
    { 
     throw new NotImplementedException(); 
    } 

    public IDataReader GetData(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetDataTypeName(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public DateTime GetDateTime(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public decimal GetDecimal(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public double GetDouble(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Type GetFieldType(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public float GetFloat(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public Guid GetGuid(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public short GetInt16(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetInt32(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public long GetInt64(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public string GetString(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public int GetValues(object[] values) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool IsDBNull(int i) 
    { 
     throw new NotImplementedException(); 
    } 

    public object this[string name] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public object this[int i] 
    { 
     get { throw new NotImplementedException(); } 
    } 

    #endregion 
} 

最後にバルク・ロード・データ・方法が見える:

public void BulkLoadData(IDataReader reader, string tableName) 
    { 
     using (SqlConnection cnn = new SqlConnection(cnnString)) 
     { 
      SqlBulkCopy copy = new SqlBulkCopy(cnn); 
      copy.DestinationTableName = tableName; 
      copy.BatchSize = 10000; 
      cnn.Open(); 
      copy.WriteToServer(reader); 
     } 
    } 

しかし、私はあなたが別の答え(特にIISのワーカープロセスのリサイクル)で指摘した理由でasp.netでこのコードを使用しないことをお勧めします。非常に軽量のキューを使用して、要求データを最初に再起動しない別のサービスに送信することをお勧めします(ZeroMQを使用して、私が書いているASP.NETアプリケーションから要求をストリーミングしてデータをロギングする....非常にパフォーマンスが良いです)。

マイク。

1

は、私は[...]による

ここで基本的なことは、2つのキューとサイクルそれらを使用することであるスレッドに、いくつかの矛盾を参照ください。 1は受信用、1は挿入用です。あなたは受信をロックする必要があり、競合はほとんどありません。

+0

私はバルク挿入を行うことができるリストにそのキューをダンプするよりも、受信に1つのキューを使用します。私は同時ロックキューを使用するので、ロックする必要はありません。私はリストを表示するためにコンテンツをダンプしている間だけキューをロックし、ロックを一括して挿入して削除します。あなたはどう思いますか?私は良くできますか? – DarthVader

+0

さて、リストのコピーはdb挿入より速いですが、まだ2つのキューが良いかもしれません。場合によります。コンカレントキューは、スイッチングを少しトリッキーにします。 –

+0

そして、ConcurrentQueueに問題があります。本当に大きくなるのですか? http://connect.microsoft.com/VisualStudio/feedback/details/552868/memory-leak-in-concurrentqueue-t-class-dequeued-enteries-are-still-rooted –

2

申し訳ありませんが、私は悪い考えであると言います。次のような問題があります。データが挿入されたとき

  • データがデータベースに書き込まれる前に、アプリケーションプールがリサイクル場合は、同じコレクション内のすべてのデータを維持するデータ
  • を失うことになると、そのコレクションをロックする必要性につながりますデータがディスクに書き込まれ、コレクションがクリアされたときです。これにより、一括挿入中にサイト全体が一時停止する可能性があります。
  • 余分な手順でコードが複雑になります。スレッドの問題を修正するのが難しい

ピークロード時にSQL Serverデータベースに毎秒1000行を書き込むWebアプリケーションを作成しました。

可能な限りシンプルにアプリケーションを書き込んでから、パフォーマンスをテストしてみてください。あなたがデータベースに挿入することができたで

速度は、ハードウェアにたくさん依存しますが、あなたがあなたのプログラムで行うことができますものもあります。

  • だけのテーブルの上に一つのインデックス(クラスタ化)を持っています。キーの自動番号。
  • できるだけ早くデータベースへの接続を解除するようにしてください。
+0

私はあなたの弾丸を認識しています。前述のロックを行う。 SQL Serverにはどのようにして1000行/秒を書きましたか?あなたはSQL Serverに毎秒10000行を行うことができますか? – DarthVader

0

他にも、sqlite(プールレシピの問題を避けるため)のようなデータベースでディスクに送信し、SQL Serverデータベースに送信することができます。

私はリアクティブエクステンションを使用して挿入キューを作成し、優れた速度で動作します。

関連する問題