2011-01-19 12 views
9

現在、F#プログラムのパフォーマンスを向上させて、C#相当の高速化を図っています。プログラムは、ピクセルのバッファにフィルタ配列を適用します。メモリへのアクセスは、常にポインタを使用して行われます。ここF#画像操作のパフォーマンスの問題

は、画像の各画素に適用されるC#コードである:

unsafe private static byte getPixelValue(byte* buffer, double* filter, int filterLength, double filterSum) 
{ 
    double sum = 0.0; 
    for (int i = 0; i < filterLength; ++i) 
    { 
     sum += (*buffer) * (*filter); 
     ++buffer; 
     ++filter; 
    } 

    sum = sum/filterSum; 

    if (sum > 255) return 255; 
    if (sum < 0) return 0; 
    return (byte) sum; 
} 

F#コードは次のように見え、限りC#プログラムの3倍を取る:

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.add buffer 1) (NativePtr.add filter 1) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

F#で可変変数とforループを使用すると、再帰を使用するのと同じ速度になります。すべてのプロジェクトは、コード最適化をオンにしてリリースモードで実行するように設定されています。

どのようにF#バージョンのパフォーマンスを改善できますか?

EDIT:

ボトルネックは(NativePtr.get buffer offset)であると思われます。このコードを固定値で置き換え、C#バージョンの対応するコードを固定値で置き換えると、どちらのプログラムでもほぼ同じ速度になります。実際、C#では速度はまったく変わらないが、F#では大きな違いがある。

この動作は変更される可能性がありますか、F#のアーキテクチャに深く根ざしていますか?

EDIT 2:

Iは、forループを使用するために、再度コードをリファクタリング。実行速度は同じまま:

let mutable acc <- 0.0 
let mutable f <- filterData 
let mutable b <- tBuffer 
for i in 1 .. filter.FilterLength do 
    acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
    f <- NativePtr.add f 1 
    b <- NativePtr.add b 1 

Iは(NativePtr.read b)を使用するバージョンとそれが固定値111uy代わりのポインタから読み出しを使用することを除いて同じである別のバージョンのILコードを比較すると、のみILコード変更の次の行:

111uy

(NativePtr.read b)はIL-コード行ldloc.s bldobj uint8(1.4秒)を有するIL-コードldc.i4.s 0x6f(0.3秒)を有し

比較のため:C#は0.4秒でフィルタリングを行います。

イメージバッファからの読み取り中にフィルタを読み取ってもパフォーマンスに影響を与えないという事実は、いくらか混乱します。私は画像の行をフィルタ処理する前に、私は行の長さを持つバッファに行をコピーします。そのため、読み取り操作はイメージ全体に広がっていませんが、このバッファ内にあり、サイズは約800バイトです。

+0

最新のコメントに基づいて、私はあればC#コンパイラは、代わりに 'ldind.u1'を使用するという事実を疑問に思います'ldobj uint8'(F#が使用する意味的に等価なILです)では違いがあります。 'ldobj uint8'を' ldind.u1'に置き換えてilasmを実行して、パフォーマンスが異なるかどうかを調べることができます。 – kvb

+0

私はそれを交換しましたが、違いはありませんでした。とにかくありがとう。 –

答えて

12

我々はC#コンパイラ(関連部分)によって生成され、並列に両方のバッファを横切る内側ループの実際のILコードを見れば:

L_0017: ldarg.0 
L_0018: ldc.i4.1 
L_0019: conv.i 
L_001a: add 
L_001b: starg.s buffer 
L_001d: ldarg.1 
L_001e: ldc.i4.8 
L_001f: conv.i 
L_0020: add 

及びF#コンパイラ:

L_0017: ldc.i4.1 
L_0018: conv.i 
L_0019: sizeof uint8 
L_001f: mul 
L_0020: add 
L_0021: ldarg.2 
L_0022: ldc.i4.1 
L_0023: conv.i 
L_0024: sizeof float64 
L_002a: mul 
L_002b: add 

C#コードはaddの演算子しか使用しませんが、F#はmuladdの両方を必要とします。しかし、明らかに各ステップでは、アドレスを計算するのではなく(sizeof byteとsizeof float)ポインタをインクリメントするだけです(addrBase +(sizeof byte))。F#mulは不要です(常に1を掛けます)。

[<NoDynamicInvocation>] 
let inline add (x : nativeptr<'a>) (n:int) : nativeptr<'a> = to_nativeint x + nativeint n * (# "sizeof !0" type('a) : nativeint #) |> of_nativeint 

だから、それはF#で「深く根ざし」ていない、それはmodule NativePtrincdec機能が欠けているだけのことだ:

ことの原因は、F#は唯一add : nativeptr<'T> -> int -> nativeptr<'T>オペレータを提供しながら、C#はポインタのため++オペレータを定義することです。

Btw、引数が生ポインタの代わりに配列として渡された場合、上記のサンプルがより簡潔な方法で書かれている可能性があります。

UPDATE:

だから次のコードは、(C#ILと非常に似て生成すると思われる)わずか1%のスピードアップを持っているん:

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.ofNativeInt <| (NativePtr.toNativeInt buffer) + (nativeint 1)) (NativePtr.ofNativeInt <| (NativePtr.toNativeInt filter) + (nativeint 8)) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

もう一つの考え:それも依存するかもしれませんあなたのテストがgetPixelValueを呼び出す回数(F#はこの関数を2つのメソッドに分割し、C#は1つのメソッドを分割します)。

ここにテストコードを掲載することはできますか?

配列について - 私はコードが少なくとも簡潔であると思います(unsafeではなく)。

UPDATE#2:

ここで、実際のボトルネックがbyte->float変換であるように見えます。

C番号:

L_0003: ldarg.1 
L_0004: ldind.u1 
L_0005: conv.r8 

F#:C#はbyte->float64のみを行いながらbyte->float32->float64

L_000c: ldarg.1 
L_000d: ldobj uint8 
L_0012: conv.r.un 
L_0013: conv.r8 

F#は以下のパスを使用していくつかの理由。理由は分かりませんが、私のF#バージョンは、gradbotテストサンプル(BTW、テストにはgradbotありがとうございました!):

let inline preadConvert (p : nativeptr<byte>) = (# "conv.r8" (# "ldobj !0" type (byte) p : byte #) : float #) 
let inline pinc (x : nativeptr<'a>) : nativeptr<'a> = NativePtr.toNativeInt x + (# "sizeof !0" type('a) : nativeint #) |> NativePtr.ofNativeInt 

let rec accumulatePixel_ed (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed 
       (acc + (preadConvert buffer) * (NativePtr.read filter), 
       (pinc buffer), 
       (pinc filter), 
       (i-1)) 
     else 
      acc 

結果:

let rec accumulatePixel_ed_last (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed_last 
       (acc + (float << int16 <| NativePtr.read buffer) * (NativePtr.read filter), 
       (NativePtr.add buffer 1), 
       (NativePtr.add filter 1), 
       (i-1)) 
     else 
      acc 

我々がやらなければならない:それは私たちも、任意のハックなしで同じ速度を達成できることが判明し

adrian 6374985677.162810 1408.870900 ms 
    gradbot 6374985677.162810 1218.908200 ms 
     C# 6374985677.162810 227.832800 ms 
C# Offset 6374985677.162810 224.921000 ms 
    mutable 6374985677.162810 1254.337300 ms 
    ed'ka 6374985677.162810 227.543100 ms 

LAST UPDATE byteint16に、次にfloatに変換することです。このようにして '高価な' 命令は避けられます。 "プリム-types.fs" から

PS関連する変換コード:大きなテストに

let inline float (x: ^a) = 
    (^a : (static member ToDouble : ^a -> float) (x)) 
    when ^a : float  = (# "" x : float #) 
    when ^a : float32 = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : int16  = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : byte  = (# "conv.r.un conv.r8" x : float #) 
    when ^a : decimal = (System.Convert.ToDouble((# "" x : decimal #))) 
+0

この興味深いアイデアをありがとう!私は乗算を使用しないポインタ用の私のカスタム追加関数を作成しました。スピードアップは、実行時間全体の約1%に過ぎません。アレイバージョンはまだ高性能であると思いますか? –

+4

素敵な探偵作品! – kvb

+0

うわー!私は本当に感心しました:-) –

1

これはどのように比較されますか?それはNativePtrへの呼び出しが少なくなっています。

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 
    let accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) length = 
     let rec accumulate acc offset = 
      if offset < length then 
       let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
       accumulate newAcc (offset + 1) 
      else 
       acc 
     accumulate acc 0 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

F#NativePtrのソースコード。

[<NoDynamicInvocation>] 
[<CompiledName("AddPointerInlined")>] 
let inline add (x : nativeptr<'T>) (n:int) : nativeptr<'T> = toNativeInt x + nativeint n * (# "sizeof !0" type('T) : nativeint #) |> ofNativeInt 

[<NoDynamicInvocation>] 
[<CompiledName("GetPointerInlined")>] 
let inline get (p : nativeptr<'T>) n = (# "ldobj !0" type ('T) (add p n) : 'T #) 
+0

ありがとう!これは良い考えですが、パフォーマンスの向上はあまりありません。 –

+0

私は(NativePtr.getバッファオフセット)は、(NativePtr.read(NativePtr.addバッファオフセット))とほぼ同じくらい速いと思いますが、おそらくそれはシーンの背後で同じことをするでしょう。 –

+0

あなたは正しいです。私は答えにF#のソースコードを追加しました。 – gradbot

1

私の結果。

adrian 6374730426.098020 1561.102500 ms 
    gradbot 6374730426.098020 1842.768000 ms 
     C# 6374730426.098020 150.793500 ms 
C# Offset 6374730426.098020 150.318900 ms 
    mutable 6374730426.098020 1446.616700 ms 

F#のテストコード

open Microsoft.FSharp.NativeInterop 
open System.Runtime.InteropServices 
open System.Diagnostics 

open AccumulatePixel 
#nowarn "9" 

let test size fn = 
    let bufferByte = Marshal.AllocHGlobal(size * 4) 
    let bufferFloat = Marshal.AllocHGlobal(size * 8) 

    let bi = NativePtr.ofNativeInt bufferByte 
    let bf = NativePtr.ofNativeInt bufferFloat 

    let random = System.Random() 

    for i in 1 .. size do 
     NativePtr.set bi i (byte <| random.Next() % 256) 
     NativePtr.set bf i (random.NextDouble()) 

    let duration (f, name) = 
     let stopWatch = Stopwatch.StartNew() 
     let time = f(0.0, bi, bf, size) 
     stopWatch.Stop() 
     printfn "%10s %f %f ms" name time stopWatch.Elapsed.TotalMilliseconds 

    List.iter duration fn 

    Marshal.FreeHGlobal bufferFloat 
    Marshal.FreeHGlobal bufferByte 

let rec accumulatePixel_adrian (acc, buffer, filter, i) = 
    if i > 0 then 
     let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
     accumulatePixel_adrian (newAcc, (NativePtr.add buffer 1), (NativePtr.add filter 1), (i - 1)) 
    else 
     acc 

let accumulatePixel_gradbot (acc, buffer, filter, length) = 
    let rec accumulate acc offset = 
     if offset < length then 
      let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
      accumulate newAcc (offset + 1) 
     else 
      acc 
    accumulate acc 0 

let accumulatePixel_mutable (acc, buffer, filter, length) = 
    let mutable acc = 0.0 
    let mutable f = filter 
    let mutable b = buffer 
    for i in 1 .. length do 
     acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
     f <- NativePtr.add f 1 
     b <- NativePtr.add b 1 
    acc 

[ 
    accumulatePixel_adrian, "adrian"; 
    accumulatePixel_gradbot, "gradbot"; 
    AccumulatePixel.getPixelValue, "C#"; 
    AccumulatePixel.getPixelValueOffset, "C# Offset"; 
    accumulatePixel_mutable, "mutable"; 
] 
|> test 100000000 

System.Console.ReadLine() |> ignore 

C#のテストコード

namespace AccumulatePixel 
{ 
    public class AccumulatePixel 
    { 
     unsafe public static double getPixelValue(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += (*buffer) * (*filter); 
       ++buffer; 
       ++filter; 
      } 

      return sum; 
     } 

     unsafe public static double getPixelValueOffset(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += buffer[i] * filter[i]; 
      } 

      return sum; 
     } 
    } 
}