私は以下のコードで説明したのと全く同じことを行っています。スレッドセーフで、フラッシュや保留中の書き込みに呼び出すことができるフラッシュメソッドがあります。スレッドが書き込むスレッシュホールド数に達すると、キュー(リストは私の場合)を別のスレッドに送信して保存します。これは、最後にデータをフラッシュするためにmanualResetEventを使用することに注意してください(64個のリセットイベントがありますので、64個以上のバックグラウンドスレッドが書き込みを保留している場合は手動で待ちますが、あなたのデータベースが本当に遅い場合を除きほとんど起こりません)。このコードは、メモリからストリーミングされた数千万のレコードを処理するために使用されました(メモリーから20m行を書き込むのに約5分かかりましたが、データベースとして保存サーバー上で実行されていました。 (つまり、BulkSqlCopyオブジェクトとIDataReaderを使用して1秒間に1行ずつ)、リクエストロードを処理する必要があります(もちろん、あなたが書いているものとデータベースに依存しますが、コードはタスクに依存します)。
また、ファシリティ一括書き込みには、データをストリームするIDataReaderの最小限の実装を作成します。以下のコードを使用するためには、必要な処理が必要です。
public class DataImporter<T>
{
public DataImporter(string tableName, string readerName)
{
_tableName = tableName;
_readerName = readerName;
}
/// <summary>
/// This is the size of our bulk staging list.
/// </summary>
/// <remarks>
/// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,
/// so records may not be going into the database in sizes of this staging value.
/// </remarks>
private int _bulkStagingListSize = 20000;
private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();
private string _tableName = String.Empty;
private string _readerName = String.Empty;
public void QueueForImport(T record)
{
lock (_listLock)
{
_items.Add(record);
if (_items.Count > _bulkStagingListSize)
{
SaveItems(_items);
_items = new List<T>();
}
}
}
/// <summary>
/// This method should be called at the end of the queueing work to ensure to clear down our list
/// </summary>
public void Flush()
{
lock (_listLock)
{
SaveItems(_items);
_items = new List<T>();
while (_tasksWaiting.Count > 64)
{
Thread.Sleep(2000);
}
WaitHandle.WaitAll(_tasksWaiting.ToArray());
}
}
private void SaveItems(List<T> items)
{
ManualResetEvent evt = new ManualResetEvent(false);
_tasksWaiting.Add(evt);
IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);
Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);
ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);
}
private void saveData(object info)
{
using (new ActivityTimer("Saving bulk data to " + _tableName))
{
Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;
IDataReader r = stateInfo.Item2;
try
{
Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);
}
catch (Exception ex)
{
//Do something
}
finally
{
_tasksWaiting.Remove(stateInfo.Item1);
stateInfo.Item1.Set();
}
}
}
private object _listLock = new object();
private List<T> _items = new List<T>();
}
DataReaderFactoryはちょうどストリーミングに使用する権利のIDataReader implmentationを選択し、以下のように見えるの下に参照さ:
internal static class DataReaderFactory
{
internal static IDataReader GetReader<T>(string typeName, List<T> items)
{
IDataReader reader = null;
switch(typeName)
{
case "ProductRecordDataReader":
reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;
break;
case "RetailerPriceRecordDataReader":
reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;
break;
default:
break;
}
return reader;
}
}
私は(althoghtは、このコードは動作します。この場合に使用されるデータリーダー実装)任意のデータ読取装置と以下の通りである:次のように
/// <summary>
/// This class creates a data reader for ProductRecord data. This is used to stream the records
/// to the SqlBulkCopy object.
/// </summary>
public class ProductRecordDataReader:IDataReader
{
public ProductRecordDataReader(List<ProductRecord> products)
{
_products = products.ToList();
}
List<ProductRecord> _products;
int currentRow;
int rowCounter = 0;
public int FieldCount
{
get
{
return 14;
}
}
#region IDataReader Members
public void Close()
{
//Do nothing.
}
public bool Read()
{
if (rowCounter < _products.Count)
{
currentRow = rowCounter;
rowCounter++;
return true;
}
else
{
return false;
}
}
public int RecordsAffected
{
get { throw new NotImplementedException(); }
}
public string GetName(int i)
{
switch (i)
{
case 0:
return "ProductSku";
case 1:
return "UPC";
case 2:
return "EAN";
case 3:
return "ISBN";
case 4:
return "ProductName";
case 5:
return "ShortDescription";
case 6:
return "LongDescription";
case 7:
return "DFFCategoryNumber";
case 8:
return "DFFManufacturerNumber";
case 9:
return "ManufacturerPartNumber";
case 10:
return "ManufacturerModelNumber";
case 11:
return "ProductImageUrl";
case 12:
return "LowestPrice";
case 13:
return "HighestPrice";
default:
return null;
}
}
public int GetOrdinal(string name)
{
switch (name)
{
case "ProductSku":
return 0;
case "UPC":
return 1;
case "EAN":
return 2;
case "ISBN":
return 3;
case "ProductName":
return 4;
case "ShortDescription":
return 5;
case "LongDescription":
return 6;
case "DFFCategoryNumber":
return 7;
case "DFFManufacturerNumber":
return 8;
case "ManufacturerPartNumber":
return 9;
case "ManufacturerModelNumber":
return 10;
case "ProductImageUrl":
return 11;
case "LowestPrice":
return 12;
case "HighestPrice":
return 13;
default:
return -1;
}
}
public object GetValue(int i)
{
switch (i)
{
case 0:
return _products[currentRow].ProductSku;
case 1:
return _products[currentRow].UPC;
case 2:
return _products[currentRow].EAN;
case 3:
return _products[currentRow].ISBN;
case 4:
return _products[currentRow].ProductName;
case 5:
return _products[currentRow].ShortDescription;
case 6:
return _products[currentRow].LongDescription;
case 7:
return _products[currentRow].DFFCategoryNumber;
case 8:
return _products[currentRow].DFFManufacturerNumber;
case 9:
return _products[currentRow].ManufacturerPartNumber;
case 10:
return _products[currentRow].ManufacturerModelNumber;
case 11:
return _products[currentRow].ProductImageUrl;
case 12:
return _products[currentRow].LowestPrice;
case 13:
return _products[currentRow].HighestPrice;
default:
return null;
}
}
#endregion
#region IDisposable Members
public void Dispose()
{
//Do nothing;
}
#endregion
#region IDataRecord Members
public bool NextResult()
{
throw new NotImplementedException();
}
public int Depth
{
get { throw new NotImplementedException(); }
}
public DataTable GetSchemaTable()
{
throw new NotImplementedException();
}
public bool IsClosed
{
get { throw new NotImplementedException(); }
}
public bool GetBoolean(int i)
{
throw new NotImplementedException();
}
public byte GetByte(int i)
{
throw new NotImplementedException();
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public char GetChar(int i)
{
throw new NotImplementedException();
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public IDataReader GetData(int i)
{
throw new NotImplementedException();
}
public string GetDataTypeName(int i)
{
throw new NotImplementedException();
}
public DateTime GetDateTime(int i)
{
throw new NotImplementedException();
}
public decimal GetDecimal(int i)
{
throw new NotImplementedException();
}
public double GetDouble(int i)
{
throw new NotImplementedException();
}
public Type GetFieldType(int i)
{
throw new NotImplementedException();
}
public float GetFloat(int i)
{
throw new NotImplementedException();
}
public Guid GetGuid(int i)
{
throw new NotImplementedException();
}
public short GetInt16(int i)
{
throw new NotImplementedException();
}
public int GetInt32(int i)
{
throw new NotImplementedException();
}
public long GetInt64(int i)
{
throw new NotImplementedException();
}
public string GetString(int i)
{
throw new NotImplementedException();
}
public int GetValues(object[] values)
{
throw new NotImplementedException();
}
public bool IsDBNull(int i)
{
throw new NotImplementedException();
}
public object this[string name]
{
get { throw new NotImplementedException(); }
}
public object this[int i]
{
get { throw new NotImplementedException(); }
}
#endregion
}
最後にバルク・ロード・データ・方法が見える:
public void BulkLoadData(IDataReader reader, string tableName)
{
using (SqlConnection cnn = new SqlConnection(cnnString))
{
SqlBulkCopy copy = new SqlBulkCopy(cnn);
copy.DestinationTableName = tableName;
copy.BatchSize = 10000;
cnn.Open();
copy.WriteToServer(reader);
}
}
しかし、私はあなたが別の答え(特にIISのワーカープロセスのリサイクル)で指摘した理由でasp.netでこのコードを使用しないことをお勧めします。非常に軽量のキューを使用して、要求データを最初に再起動しない別のサービスに送信することをお勧めします(ZeroMQを使用して、私が書いているASP.NETアプリケーションから要求をストリーミングしてデータをロギングする....非常にパフォーマンスが良いです)。
マイク。
なぜあなたはこれをしたいですか? – Marc
私は一度に1つの挿入またはトランザクションを行う場合、データアクセスが遅いため。 – DarthVader
あなたが直面しているスレッドの問題については、元のコレクションを別のコレクション(newCollection)にコピーし、元のコレクションのロックをクリアして削除し、newCollectionを使用してデータベースに挿入してください。このアプローチでは、新しいリクエストは長時間ブロックされませんでした。 –