2011-09-11 5 views
2

C#でマルチスレッド化するのが初めてです。私はサイズ(x)(y)(z)の3D配列を持っていて、すべての(x、y)値のすべてのzサンプルの平均を計算したいと言います。私はマルチスレッド(2つのスレッド)を使用して、スレッド1に処理するためのサイズ(x/2)* y * zの配列の半分を送信し、残りの半分をスレッド2に送信したいとします。C#クエリでの突発的な処理

どうすればよいですか?個々のスレッドから引数を渡して取得するにはどうすればよいですか?コード例が役に立ちます。

よろしく

答えて

8

私は自分自身をこのためPLINQを使用しての代わりにこれをスレッドをお勧めします。

LINQ構文を使用してクエリを実行できますが、すべてのコアで自動的に並列化できます。

0

いくつかのCPU上でジョブを配布するための低オーバーヘッドスケジューラを実装するのがちょっと難しいため、何かPLINQ(ReedやParallel.Forなど)を使用するのが理にかなっている理由はたくさんあります。

私が正しくあなたを理解している場合ので、多分これはあなたが始めることができた(私の4コアマシンで並列バージョンは3倍速いシングルコア版より):

using System; 
using System.Diagnostics; 
using System.Linq; 
using System.Threading.Tasks; 

class Program 
{ 

    static void AverageOfZ (
     double[] input, 
     double[] result, 
     int x, 
     int y, 
     int z 
     ) 
    { 
     Debug.Assert(input.Length == x*y*z); 
     Debug.Assert(result.Length == x*y); 

     //Replace Parallel with Sequential to compare with non-parallel loop 
     //Sequential.For(
     Parallel.For(
      0, 
      x*y, 
      i => 
       { 
        var begin = i*z; 
        var end = begin + z; 
        var sum = 0.0; 

        for (var iter = begin; iter < end; ++iter) 
        { 
         sum += input[iter]; 
        } 

        result[i] = sum/z; 
       }); 

    } 

    static void Main(string[] args) 
    { 
     const int X = 64; 
     const int Y = 64; 
     const int Z = 64; 
     const int Repetitions = 40000; 

     var random = new Random(19740531); 
     var samples = Enumerable.Range(0, X*Y*Z).Select(x => random.NextDouble()).ToArray(); 
     var result = new double[X*Y]; 

     var then = DateTime.Now; 

     for (var iter = 0; iter < Repetitions; ++iter) 
     { 
      AverageOfZ(samples, result, X, Y, Z); 
     } 

     var diff = DateTime.Now - then; 
     Console.WriteLine(
      "{0} samples processed {1} times in {2} seconds", 
      samples.Length, 
      Repetitions, 
      diff.TotalSeconds 
      ); 

    } 
} 

static class Sequential 
{ 
    public static void For(int from, int to, Action<int> action) 
    { 
     for (var iter = from; iter < to; ++iter) 
     { 
      action(iter); 
     } 
    } 
} 

PS。同時実行に向かうときは、別のコアがメモリにアクセスする方法を考慮することが重要です。そうしないとパフォーマンスが失われるのは非常に簡単です。

+0

総合的な返信をありがとう。しかし、PLINQは.NET 3.5以前か.NET 4.0より前のバージョンで利用可能ですか?フレームワークの制限のために.NET 3.5に限定されているので私は尋ねます。 – user938972

+0

Parallelは.NET4に追加されています。 Parallel.Forを実装することは、スレッド間のタスクを盗んだり、ユーザーWin7ユーザーモードのスレッド(オーバーヘッドを低くする)のためにかなりクールなものがあるため、想像できるものより少し難しいです。しかし、高性能並列コンピューティングが必要だが、フレームワークを書くのが好きでない場合は、PPL(Parallel for C++)を使用してVS2010でC++ライブラリを作成し、静的リンクを使用してライブラリを作成し、そこから使用する自己完結型DLLを作成することができます。 NET35 – FuleSnabel

0

Dot Net 3.5以降では、マルチスレッドの場合はParallel、Async IOの場合はAsyncなどの複雑さを抽象化する多くのショートカットキーワードが導入されています。残念ながら、これはまた、これらのタスクに関わるものを理解する機会を提供しません。たとえば、私の同僚は、認証トークンを返したログインメソッドにAsyncを最近使用しようとしていました。

ここでは、ご使用のシナリオ用の完全なマルチスレッドサンプルコードを示します。 Xは経度 Yは Lattitudeであり、Zは、サンプル・コードは、各における降雨サンプル座標ワークデザインパターンの単位となる次座標における降雨サンプル

である:サンプルコードは、そのふりそれをよりリアル作ります作業項目。また、バックグラウンドスレッドプールを使用する代わりに、個別のフォアグランドスレッドを作成します。 作業項目の簡素化と計算時間の短縮のために、私はスレッド同期ロックを2つのロックに分割しました。 1つは作業キュー用、もう1つは出力データ用です。 注:私はLyncなどのドットネットショートカットを使用していないので、このコードはDot Net 2.0でも実行する必要があります。

実際のアプリケーション開発では、以下のようなものは、連続した作業項目のストリーム処理などの複雑なシナリオでのみ必要となります。この場合、スレッドが効果的に出力データバッファを定期的にクリアする必要があります永遠に走る

public static class MultiThreadSumRainFall 
{ 
    const int LongitudeSize = 64; 
    const int LattitudeSize = 64; 
    const int RainFallSamplesSize = 64; 
    const int SampleMinValue = 0; 
    const int SampleMaxValue = 1000; 
    const int ThreadCount = 4; 

    public static void SumRainfallAndOutputValues() 
    { 
     int[][][] SampleData; 
     SampleData = GenerateSampleRainfallData(); 
     for (int Longitude = 0; Longitude < LongitudeSize; Longitude++) 
     { 
      for (int Lattitude = 0; Lattitude < LattitudeSize; Lattitude++) 
      { 
       QueueWork(new WorkItem(Longitude, Lattitude, SampleData[Longitude][Lattitude])); 
      } 
     } 
     System.Threading.ThreadStart WorkThreadStart; 
     System.Threading.Thread WorkThread; 
     List<System.Threading.Thread> RunningThreads; 
     WorkThreadStart = new System.Threading.ThreadStart(ParallelSum); 
     int NumThreads; 
     NumThreads = ThreadCount; 
     if (ThreadCount < 1) 
     { 
      NumThreads = 1; 
     } 
     else if (NumThreads > (Environment.ProcessorCount + 1)) 
     { 
      NumThreads = Environment.ProcessorCount + 1; 
     } 
     OutputData = new int[LongitudeSize, LattitudeSize]; 
     RunningThreads = new List<System.Threading.Thread>(); 
     for (int I = 0; I < NumThreads; I++) 
     { 
      WorkThread = new System.Threading.Thread(WorkThreadStart); 
      WorkThread.Start(); 
      RunningThreads.Add(WorkThread); 
     } 
     bool AllThreadsComplete; 
     AllThreadsComplete = false; 
     while (!AllThreadsComplete) 
     { 
      System.Threading.Thread.Sleep(100); 
      AllThreadsComplete = true; 
      foreach (System.Threading.Thread WorkerThread in RunningThreads) 
      { 
       if (WorkerThread.IsAlive) 
       { 
        AllThreadsComplete = false; 
       } 
      } 
     } 
     for (int Longitude = 0; Longitude < LongitudeSize; Longitude++) 
     { 
      for (int Lattitude = 0; Lattitude < LattitudeSize; Lattitude++) 
      { 
       Console.Write(string.Concat(OutputData[Longitude, Lattitude], @" ")); 
      } 
      Console.WriteLine(); 
     } 
    } 

    private class WorkItem 
    { 
     public WorkItem(int _Longitude, int _Lattitude, int[] _RainFallSamples) 
     { 
      Longitude = _Longitude; 
      Lattitude = _Lattitude; 
      RainFallSamples = _RainFallSamples; 
     } 
     public int Longitude { get; set; } 
     public int Lattitude { get; set; } 
     public int[] RainFallSamples { get; set; } 
    } 

    public static int[][][] GenerateSampleRainfallData() 
    { 
     int[][][] Result; 
     Random Rnd; 
     Rnd = new Random(); 
     Result = new int[LongitudeSize][][]; 
     for(int Longitude = 0; Longitude < LongitudeSize; Longitude++) 
     { 
      Result[Longitude] = new int[LattitudeSize][]; 
      for (int Lattidude = 0; Lattidude < LattitudeSize; Lattidude++) 
      { 
       Result[Longitude][Lattidude] = new int[RainFallSamplesSize]; 
       for (int Sample = 0; Sample < RainFallSamplesSize; Sample++) 
       { 
        Result[Longitude][Lattidude][Sample] = Rnd.Next(SampleMinValue, SampleMaxValue); 
       } 
      } 
     } 
     return Result; 
    } 

    private static object SyncRootWorkQueue = new object(); 
    private static Queue<WorkItem> WorkQueue = new Queue<WorkItem>(); 
    private static void QueueWork(WorkItem SamplesWorkItem) 
    { 
     lock(SyncRootWorkQueue) 
     { 
      WorkQueue.Enqueue(SamplesWorkItem); 
     } 
    } 
    private static WorkItem DeQueueWork() 
    { 
     WorkItem Samples; 
     Samples = null; 
     lock (SyncRootWorkQueue) 
     { 
      if (WorkQueue.Count > 0) 
      { 
       Samples = WorkQueue.Dequeue(); 
      } 
     } 
     return Samples; 
    } 
    private static int QueueSize() 
    { 
     lock(SyncRootWorkQueue) 
     { 
      return WorkQueue.Count; 
     } 
    } 

    private static object SyncRootOutputData = new object(); 
    private static int[,] OutputData; 
    private static void SetOutputData(int Longitude, int Lattitude, int SumSamples) 
    { 
     lock(SyncRootOutputData) 
     { 
      OutputData[Longitude, Lattitude] = SumSamples; 
     } 
    } 

    private static void ParallelSum() 
    { 
     WorkItem SamplesWorkItem; 
     int SummedResult; 
     SamplesWorkItem = DeQueueWork(); 
     while (SamplesWorkItem != null) 
     { 
      SummedResult = 0; 
      foreach (int SampleValue in SamplesWorkItem.RainFallSamples) 
      { 
       SummedResult += SampleValue; 
      } 
      SetOutputData(SamplesWorkItem.Longitude, SamplesWorkItem.Lattitude, SummedResult); 
      SamplesWorkItem = DeQueueWork(); 
     } 
    } 
} 
関連する問題