2017-11-11 3 views

答えて

12

ここでは、(実験的な種類の)Base.Threadsスレッドモジュールを使用したソリューションです。

分散並列処理のためにpmap(など)を使用するソリューションは類似しています。プロセス間通信のオーバーヘッドがあなたを傷つけるだろうと私は思っていますが。

アイデアはブロック単位で(スレッドごとに1つずつ)ソートすることで、各スレッドは完全に独立していて、ブロックを処理するだけです。

次に、これらの事前ソートされたブロックをマージします。

これはソートされたリストをマージすることで、よく知られている問題です。これについてはquestionsも参照してください。

あなたが起動する前に、環境変数JULIA_NUM_THREADSを設定して、マルチスレッドを自分で設定することを忘れないでください。私はいくつかのベンチマークを行っている

using Base.Threads 

function blockranges(nblocks, total_len) 
    rem = total_len % nblocks 
    main_len = div(total_len, nblocks) 

    starts=Int[1] 
    ends=Int[] 
    for ii in 1:nblocks 
     len = main_len 
     if rem>0 
      len+=1 
      rem-=1 
     end 
     push!(ends, starts[end]+len-1) 
     push!(starts, ends[end] + 1) 
    end 
    @assert ends[end] == total_len 
    starts[1:end-1], ends 
end 

function threadedsort!(data::Vector) 
    starts, ends = blockranges(nthreads(), length(data)) 

    # Sort each block 
    @threads for (ss, ee) in collect(zip(starts, ends)) 
     @inbounds sort!(@view data[ss:ee]) 
    end 


    # Go through each sorted block taking out the smallest item and putting it in the new array 
    # This code could maybe be optimised. see https://stackoverflow.com/a/22057372/179081 
    ret = similar(data) # main bit of allocation right here. avoiding it seems expensive. 
    # Need to not overwrite data we haven't read yet 
    @inbounds for ii in eachindex(ret) 
     minblock_id = 1 
     ret[ii]=data[starts[1]] 
     @inbounds for blockid in 2:endof(starts) # findmin allocates a lot for some reason, so do the find by hand. (maybe use findmin! ?) 
      ele = data[starts[blockid]] 
      if ret[ii] > ele 
       ret[ii] = ele 
       minblock_id = blockid 
      end 
     end 
     starts[minblock_id]+=1 # move the start point forward 
     if starts[minblock_id] > ends[minblock_id] 
      deleteat!(starts, minblock_id) 
      deleteat!(ends, minblock_id) 
     end 
    end 
    data.=ret # copy back into orignal as we said we would do it inplace 
    return data 
end 

using Plots 
function evaluate_timing(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(Int, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing(0:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 

マイ結果:は、8つのスレッドを使用して

は、ここに私のコードです。

plot normal scale plot loglog scale

Iは初期長い時間が取られ、そのビット 1024上の注意を無視することができ、驚くほど低いことがクロスオーバーポイントを発見 - それは最初のためにコンパイルされたコードであるJITあります走る

BenchmarkToolsを使用すると、奇妙な結果が再現されません。 ベンチマークツールでは、最初のタイミングがカウントされなくなりました。 しかし、上記のベンチマークコードのように、通常のタイミングコードを使用すると、一貫して再現します。 私はそれを殺す何かをやっていると思い

ビッグおかげで、私はさらにそこ項目のわずか1%です一意である場合はテストしている

1

私の解析コードの誤りを指摘し、誰@xiaodaiする方法をいくつかのマルチスレッドまた、1:1_000_000からサンプリングするenter image description here。結果は

enter image description here 関数evaluate_timing_w_repeats(範囲) サイズ= INT [] threadsort_times =のfloat64 [] sort_times =のfloat64 [] 2でSZため ^(範囲)を収集 data_orig =ランド(RAND(未満でありますInt、sz÷100)、sz) プッシュ!(サイズ、sz)

  data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing_w_repeats(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort_with_repeats.png") 

function evaluate_timing1m(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(1:1_000_000, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing1m(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time sampel from 1:1_000_000", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort1m.png") 
関連する問題