2016-04-13 27 views
1

私は新しい分野であるGPUで計算を実行するためにC++とcuda/thrustを使用しています。残念ながら、私のコード(以下のMCVE)は効率的ではありませんので、最適化する方法を知りたいと思います。コードは次の操作を実行します。Cuda Thrust - sort_by_key、merge_by_key、およびreduce_by_keyを使用してコードを最適化する方法

2つのキーベクトルと2つの値ベクトルがあります。キーベクトルは基本的に上三角行列のiとj(この例ではサイズ4x4)を含んでいます。

key1 {0, 0, 0, 1, 1, 2} value1: {0.5, 0.5, 0.5, -1.0, -1.0, 2.0} 
key2 {1, 2, 3, 2, 3, 3} value2: {-1, 2.0, -3.5, 2.0, -3.5, -3.5} 

タスクは、同じキーを持つすべての値を超える合計することです。これを達成するため、sort_by_keyを使用して2番目の値ベクトルをソートしました。結果は次のとおりです。

key1 {0, 0, 0, 1, 1, 2} value1: {0.5, 0.5, 0.5, -1.0, -1.0, 2.0} 
key2 {1, 2, 2, 3, 3, 3} value2: {-1.0, 2.0, 2.0, -3.5, -3.5, -3.5} 

はその後、私は以前よりも大きな二重サイズで、新しいキーと値のベクトルにつながるmerge_by_keyを使用して、両方の値ベクトルを、吸収合併しました。

key_merge {0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3} 
value_merge {0.5, 0.5, 0.5, -1.0, -1.0, -1.0, 2.0, 2.0, 2.0, -3.5, -3.5, -3.5} 

最後の手順では、reduce_by_keyを使用して、すべての値を同じキーで合計します。結果は次のとおりです。

key {0, 1, 2, 3} value: {1.5, -3.0, 6.0, -10.5} 

この操作を実行し、それ以下のコードは、ゆっくりと静かで、私は大きいサイズのパフォーマンスが悪いことになることを恐れています。どのように最適化することができますか? sort_by_key、merge_by_key、reduce_by_keyを融合することは可能ですか?あらかじめsort_by_keyから結果のキーベクトルを知っているので、値ベクトルを「古いキーから新しいキーに」変換することは可能ですか?それらを減らす前に2つのベクトルをマージするのは理にかなっていますか?値/キーベクトルの各ペアに対してreduce_by_keyを別々に使用する方が速いのですか?事実を使用してreduce_by_key計算をスピードアップすることは可能ですか?ここでは異なるキー値の数が分かり、等価キーの数は常に同じですか?

#include <stdio.h> 
#include <thrust/host_vector.h> 
#include <thrust/device_vector.h> 
#include <thrust/sort.h> 
#include <thrust/reduce.h> 
#include <thrust/merge.h> 

int main(){ 
    int key_1[6] = {0, 0, 0, 1, 1, 2}; 
    int key_2[6] = {1, 2, 3, 2, 3, 3}; 
    thrust::device_vector<double> k1(key_1,key_1+6); 
    thrust::device_vector<double> k2(key_2,key_2+6); 

    double value_1[6] = {0.5, 0.5, 0.5, -1.0, -1.0, 2.0}; 
    double value_2[6] = {-1, 2.0, -3.5, 2.0, -3.5, -3.5}; 
    thrust::device_vector<double> v1(value_1,value_1+6); 
    thrust::device_vector<double> v2(value_2,value_2+6); 

    thrust::device_vector<double> mk(12); 
    thrust::device_vector<double> mv(12); 
    thrust::device_vector<double> rk(4); 
    thrust::device_vector<double> rv(4); 

    thrust::sort_by_key(k2.begin(), k2.end(), v2.begin()); 
    thrust::merge_by_key(k1.begin(), k1.end(), k2.begin(), k2.end(),v1.begin(), v2.begin(), mk.begin(), mv.begin()); 
    thrust::reduce_by_key(mk.begin(), mk.end(), mv.begin(), rk.begin(), rv.begin()); 

    for (unsigned i=0; i<4; i++) { 
    double tmp1 = rk[i]; 
    double tmp2 = rv[i]; 
    printf("key value %f is related to %f\n", tmp1, tmp2); 
    } 
    return 0; 
} 

結果:

key value 0.000000 is related to 1.500000 
key value 1.000000 is related to -3.000000 
key value 2.000000 is related to 6.000000 
key value 3.000000 is related to -10.500000 

答えて

2

ここで私はあなたの順序よりも速くかもしれないと思う一つの可能​​なアプローチがあります。重要なアイデアは、事前に注文を知っているところでデータのソートを避けたいということです。データをソートするのではなく、私たちが持っている注文知識を活用することができれば、単にそれを希望の配列に並べ替えることができます。

データについていくつかの観察をしましょう。

  1. 連結ベクトルは同じ数が含まれます:あなたのkey1key2は、実際には上三角行列のI、Jの指標であるならば、我々はこれら2つのベクトルの連結に関するいくつかの観察を行うことができます各キーあなたの場合、ベクトルには3つの0キー、3つの1キー、3つの2のキー、3つの3のキーが含まれています。このパターンは、行列の次元に依存しない任意の上三角パターンに対して成立するはずです。従って、上三角である次元Nの行列は、連結されたインデックスベクトル内にN個のセットのキーを有し、各セットはN-1のような要素からなる。

  2. 連結されたベクトルでは、(行列次元Nに基づいて)キーの一貫した順序付けを発見/確立することができ、従来のソート操作に頼ることなく類似キーグループ化された順序でベクトルを並べ替えることができます。

我々は上記の2つのアイデアを組み合わせた場合、我々はおそらくthrust::reduce_by_key操作に続いて、活動をマージ/ソートを置き換えるために、いくつかの散布作業と全体の問題を解決することができます。散布操作は、thrust::copyで適切なthrust::permutation_iteratorと適切なインデックス計算ファンクタを組み合わせて行うことができます。並べ替えられた並べ替え済みのベクトルkeyがどのように表示されるか正確に知っているので(次元:4の例:{0,0,0,1,1,1,2,2,2,3,3,3})、並べ替えを明示的に実行する必要はありません。ただし、同じマッピングを使用してベクトルvalueを並べ替える必要があります。それでは、そのマッピングするための演算を開発しましょう:

dimension (N=)4 example 

vector index:   0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11 
desired (group) order: 0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3 
concatenated keys:  0, 0, 0, 1, 1, 2, 1, 2, 3, 2, 3, 3 
group start idx:  0, 0, 0, 3, 3, 6, 3, 6, 9, 6, 9, 9 
group offset idx:  0, 1, 2, 0, 1, 0, 2, 1, 0, 2, 1, 2 
destination idx:  0, 1, 2, 3, 4, 6, 5, 7, 9, 8,10,11 


dimension (N=)5 example 

vector index:   0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19 
desired (group) order: 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4 
concatenated keys:  0, 0, 0, 0, 1, 1, 1, 2, 2, 3, 1, 2, 3, 4, 2, 3, 4, 3, 4, 4 
group start idx:  0, 0, 0, 0, 4, 4, 4, 8, 8,12, 4, 8,12,16, 8,12,16,12,16,16 
group offset idx:  0, 1, 2, 3, 0, 1, 2, 0, 1, 0, 3, 2, 1, 0, 3, 2, 1, 3, 2, 3 
destination idx:  0, 1, 2, 3, 4, 5, 6,10, 7, 8,11,14, 9,12,15,17,13,16,18,19 

我々は、それぞれの場合に、宛先インデックス(すなわち、所望のグループの順序のために選択されたキーまたは値を移動させる位置)がに等しいことを観察することができますグループ開始インデックス+グループオフセットインデックス。グループ開始インデックスは、単にキーに(N-1)を掛けたものです。グループオフセットインデックスは、上部または下部の三角形のインデックスパターンに似たパターンです(連結ベクトルの各半分について、2つの異なるインカネーションで)。連結されたキーは、key1key2ベクトルの連結です(この連結は実質的にpermutation_iteratorを使用して作成されます)。所望のグループ順序は先験的に知られており、単にN-1個の要素からなるN個のグループを有する整数グループのシーケンスである。連結されたキーベクトルのソートされたバージョンと同等です。したがって、ファンクタ内の宛先インデックスを直接計算することができます。グループを作成するための

は、インデックスパターンをオフセット、我々はあなたの二つの重要なベクトルを引く(と追加の1を減算)することができます:ここでは

key2:     1, 2, 3, 2, 3, 3 
key1:     0, 0, 0, 1, 1, 2 
key1+1:    1, 1, 1, 2, 2, 3 
p1 = key2-(key1+1): 0, 1, 2, 0, 1, 0 
p2 = (N-2)-p1:   2, 1, 0, 2, 1, 2 
grp offset idx=p1|p2: 0, 1, 2, 0, 1, 0, 2, 1, 0, 2, 1, 2 

はあなたの例を使用して上記の概念を実証する、完全に加工した例でありますデータ:

$ cat t1133.cu 
#include <thrust/host_vector.h> 
#include <thrust/device_vector.h> 
#include <thrust/reduce.h> 
#include <thrust/copy.h> 
#include <thrust/transform.h> 
#include <thrust/iterator/transform_iterator.h> 
#include <thrust/iterator/permutation_iterator.h> 
#include <thrust/iterator/zip_iterator.h> 
#include <thrust/iterator/counting_iterator.h> 
#include <iostream> 

// "triangular sort" index generator 
struct idx_functor 
{ 
    int n; 
    idx_functor(int _n): n(_n) {}; 
    template <typename T> 
    __host__ __device__ 
    int operator()(const T &t){ 
    int k1 = thrust::get<0>(t); 
    int k2 = thrust::get<1>(t); 
    int id = thrust::get<2>(t); 
    int go,k; 
    if (id < (n*(n-1))/2){ // first half 
     go = k2-k1-1; 
     k = k1; 
     } 
    else { // second half 
     go = n-k2+k1-1; 
     k = k2; 
     } 
    return k*(n-1)+go; 
    } 
}; 


const int N = 4; 
using namespace thrust::placeholders; 

int main(){ 
    // useful dimensions 
    int d1 = N*(N-1); 
    int d2 = d1/2; 
    // iniitialize keys 
    int key_1[] = {0, 0, 0, 1, 1, 2}; 
    int key_2[] = {1, 2, 3, 2, 3, 3}; 
    thrust::device_vector<int> k1(key_1, key_1+d2); 
    thrust::device_vector<int> k2(key_2, key_2+d2); 
    // initialize values 
    double value_1[] = {0.5, 0.5, 0.5, -1.0, -1.0, 2.0}; 
    double value_2[] = {-1, 2.0, -3.5, 2.0, -3.5, -3.5}; 
    thrust::device_vector<double> v(d1); 
    thrust::device_vector<double> vg(d1); 
    thrust::copy_n(value_1, d2, v.begin()); 
    thrust::copy_n(value_2, d2, v.begin()+d2); 
    // reorder (group) values by key 
    thrust::copy(v.begin(), v.end(), thrust::make_permutation_iterator(vg.begin(), thrust::make_transform_iterator(thrust::make_zip_iterator(thrust::make_tuple(thrust::make_permutation_iterator(k1.begin(), thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1%d2)), thrust::make_permutation_iterator(k2.begin(), thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1%d2)), thrust::counting_iterator<int>(0))), idx_functor(N)))); 
    // sum results 
    thrust::device_vector<double> rv(N); 
    thrust::device_vector<int> rk(N); 
    thrust::reduce_by_key(thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1/(N-1)), thrust::make_transform_iterator(thrust::counting_iterator<int>(d1), _1/(N-1)), vg.begin(), rk.begin(), rv.begin()); 
    // print results 
    std::cout << "Keys:" << std::endl; 
    thrust::copy_n(rk.begin(), N, std::ostream_iterator<int>(std::cout, ", ")); 
    std::cout << std::endl << "Sums:" << std::endl; 
    thrust::copy_n(rv.begin(), N, std::ostream_iterator<double>(std::cout, ", ")); 
    std::cout << std::endl; 
    return 0; 
} 
$ nvcc -std=c++11 -o t1133 t1133.cu 
$ ./t1133 
Keys: 
0, 1, 2, 3, 
Sums: 
1.5, -3, 6, -10.5, 
$ 

正味の効果は、あなたのthrust::sort_by_keythrust::merge_by_key操作がより効率的である必要があり、単一のthrust::copyの操作に置き換えられているということです。

関連する問題