2017-11-17 3 views
6

両方COL /行方向近傍の意識する横断するのI /各行内、Parallellizableアルゴリズムは、2Dマトリックスが

、かなり大きなN * Nの整数行列Matrix2Dを(十分なメモリを想定)を有します列の場合、値が右下/隣の場合、要素のインデックスcol/rowのインデックスを記録する必要があります。

、私は理想的にはOMPによって並列化可能な最適なアルゴリズムを見つけたいと考えています。

だから、最終的に私は

インナー std::vector<int>が行/ COL指数を記録
std::vector<std::vector<int>> RowWiseDiscontinuity(N);// N= #of rows 
std::vector<std::vector<int>> ColWiseDiscontinuity(N);// N= #of cols 

、のようないくつかのデータ構造を持つことになります。

私はここにシリアルバージョンを入れましたが、OMPを並列化するのは難しいと感じています...誰かがompでこの2Dマトリックスを越えて横断を実装する方法をいくつか考えていますか?

コードスニペット

std::vector<std::vector<int>> RowWiseDiscontinuity(N);// N= #of rows 
std::vector<std::vector<int>> ColWiseDiscontinuity(N);// N= #of cols 
std::vector<int> TempX1; 
std::vector<int> TempX2; 
for (int y=0; y<N; ++y) 
{ 
    TempX1.clear(); 
    for (int x =0; x<N; ++x) 
    { 
     int value = Matrix2D(x,y); 
     TempX1.push_back(value); 
    } 

    auto iter1 = TempX1.begin(); 
    auto iter2 = TempX2.begin(); 

    if (y>0) 
    for (int x =0; x<N; ++x) 
    { 
     if (*iter1 !=*(iter1+1)) 
     { 
      RowWiseDiscontinuity[y].push_back(x); //Critical for OMP 
     } 
     ++iter1; 
     ++iter2; 
     if (*iter1 != *iter2) 
     { 
      ColWiseDiscontinuity[x].push_back(y); //Critical for OMP 
     } 
    } 

    TempX2.swap(TempX1); // proceed to next row, remember previous 

} 
+0

スレッドローカルベクトルにペア(x、y)または(y、x)を格納し、内部ループの終了後にRow/ColWiseDiscontinuityに追加することについて考えましたか?この方法で少量の追加ストレージしか使用せず、Row/ColWiseDiscontinuityへの挿入はパフォーマンスに大きな影響を与えることなく順番に実行できます(私は不連続性がまばらにしか起こらないと仮定しています) –

+0

'2Dベクトルマトリックス'' 1D vector'にストライド値を使用して行に何個の列をマークするか? –

+0

'TempX1'、' TempX2'を削除し、 'Matrix2D'から直接データを使うことができます。うまくいけば、外側ループを並列化することができます。しかし、 'Matrix2D'がキャッシュに収まらない場合は、メモリー帯域幅に縛られている可能性があります。 – geza

答えて

2

私は、列と行の両方に最も近い隣を保持する別の配列を作成します。これは明らかに最初のパスとして行われなければならないでしょう。私はあなたが望むインデックスを保持するペアの2次元配列を作成することをお勧めします。 2つのベクトルの代わりに、私はペアのベクトルを実行します。ペアは並列化が可能で、簡単にソートできます。

+3

私はこれがあなたが欲しいものなのかどうか完全に分かっていないので、downvotingの前に私を修正してください。ありがとう。 – NL628

2

は、マトリックス上の(異なるスレッドで実行することができる)は、2つのパス、列方向不連続の行ごとの不連続性および他のための1つを実行します。次のように

ロウパスが見える:

for (int y = 0; y < N; ++y) // Can be parallelized 
{ 
    for (int x = 0; x < N - 1; ++x) 
    { 
     if(Matrix(x, y) != Matrix(x + 1, y)) 
      RowWiseDiscontinuity[y].push_back(x); 
    } 
} 

列パスが類似している。両方の場合において

for (int x = 0; x < N; ++x) // Can be parallelized 
{ 
    for (int y = 0; y < N - 1; ++y) 
    { 
     if(Matrix(x, y) != Matrix(x, y + 1)) 
      ColWiseDiscontinuity[x].push_back(y); 
    } 
} 

外側のループが並列化することができます。 異なる要素の/ColWiseDiscontinuityは、外側ループの各反復において突然変異し、したがって、データ競争を防止する。パス自体は異なるスレッドで実行できます。

サイドノートとしては、行優先および列優先順位の両方をマトリックスを格納し、適切な場合、各注文を使用して(メモリを犠牲にして)キャッシュミスを減少させることによって、さらに、このアルゴリズムを最適化することができます。行優先順序では、要素(x + 1, y)は常に(x, y)の次にあります。同様のことが、要素の(x, y + 1)についても列優先順位で当てはまる。

+2

私は原因不明のdownvoteを得ることはできません、私は私の解決策で何が間違っているかを見ません。 – EvilTak

1

ここでは、隣接する対角隣接を見つける基本テストを行い、4x4単位行列を使用して結果を記録するアルゴリズムがあります。これには、OMPまたは並列コンピューティングの使用は含まれません。しかしこれは、使用するのに十分シンプルなMxN行列の汎用クラステンプレートです。内容をベクトルのベクトルに格納する代わりに、私は単一の1Dベクトルにデータを平坦化し、テンプレートのインスタンス化時にすでにメモリの量が保存されています。関数テンプレートを使用して、インデックス(M,N)または(x,y)を返す行列内の要素と、結果が真または偽であるかどうかを比較しています。ここではx-yインデックス&の関係を含む構造体を使用します。隣人をチェックする経験則は、最後の列&の行列の最後の行を見ることを避ける。なぜなら、父親は右にも奥にも要素がないからである。これは主関数内に見ることができる。これは、クラスを適用しようとする可能性のある場所に、&関数をOMPライブラリに組み込むのに役立ちます。

template<unsigned Col, unsigned Row> 
class Matrix2D { 
public: 
    const unsigned col_size = Col; 
    const unsigned row_size = Row; 
    const unsigned stride_ = col_size; 
    const unsigned matrix_size = col_size * row_size; 

private: 
    std::vector<int> data_; 

public: 
    Matrix2D() { 
     data_.resize(matrix_size); 
    } 

    void addElement(unsigned x, unsigned y, int val) { 
     data_[(x * col_size + y)] = val; 
    } 

    /*int getElement(unsigned x, unsigned y) { 
     int value = data_[(x * col_size + y)]; 
     return value; 
    }*/ 

    int getElement(unsigned idx) { 
     return data_[idx]; 
    } 
}; 

struct Neighbor { 
    unsigned indexCol; 
    unsigned indexRow; 
    bool  notSame; 
}; 


template<unsigned Col, unsigned Row> 
void compareMatrixDiagonals(Matrix2D<Col, Row>& mat, Neighbor& n, unsigned colIdx, unsigned rowIdx); 

int main() { 

    Matrix2D<4, 4> mat4x4; 
    mat4x4.addElement(0, 0, 1); 
    mat4x4.addElement(0, 1, 0); 
    mat4x4.addElement(0, 2, 0); 
    mat4x4.addElement(0, 3, 0); 

    mat4x4.addElement(1, 0, 0); 
    mat4x4.addElement(1, 1, 1); 
    mat4x4.addElement(1, 2, 0); 
    mat4x4.addElement(1, 3, 0); 

    mat4x4.addElement(2, 0, 0); 
    mat4x4.addElement(2, 1, 0); 
    mat4x4.addElement(2, 2, 1); 
    mat4x4.addElement(2, 3, 0); 

    mat4x4.addElement(3, 0, 0); 
    mat4x4.addElement(3, 1, 0); 
    mat4x4.addElement(3, 2, 0); 
    mat4x4.addElement(3, 3, 1); 

    unsigned idx = 0; 
    for (unsigned i = 0; i < mat4x4.matrix_size; i++) { 
     std::cout << mat4x4.getElement(i) << " "; 
     idx++; 

     if (idx == 4) { 
      std::cout << "\n"; 
      idx = 0; 
     } 
    } 
    std::cout << "\n";  

    unsigned colIdx = 0; 
    unsigned rowIdx = 0; 
    std::vector<Neighbor> neighbors; 
    Neighbor n; 

    // If we are in the last col or row we can ignore 
    // (0,3),(1,3),(2,3),(3,3),(3,0),(3,1),(3,2), {*(3,3)* already excluded} 
    // This is with a 4x4 matrix: we can substitute and use LastCol - LastRow 
    // for any size MxN Matrix. 
    const unsigned LastCol = mat4x4.col_size - 1; 
    const unsigned LastRow = mat4x4.row_size - 1; 

    for (unsigned i = 0; i < LastCol; i++) { 
     for (unsigned j = 0; j < LastRow; j++) { 
      compareMatrixDiagonals(mat4x4, n, i, j); 
      neighbors.push_back(n); 
     } 
    } 

    for (unsigned i = 0; i < neighbors.size(); i++) { 
     std::cout << "(" << neighbors[i].indexCol 
      << "," << neighbors[i].indexRow 
      << ") " << neighbors[i].notSame 
      << "\n"; 
    } 

    std::cout << "\nPress any key & enter to quit." << std::endl; 
    char c; 
    std::cin >> c; 

    return 0; 
} 

template<unsigned Col, unsigned Row> 
void compareMatrixDiagonals(Matrix2D<Col, Row>& mat, Neighbor& N, unsigned colIdx, unsigned rowIdx) { 
    unsigned firstIdx = (colIdx * mat.col_size + rowIdx); 
    unsigned nextIdx = ((colIdx + 1) * mat.col_size + (rowIdx + 1)); 
    if (mat.getElement(firstIdx) != mat.getElement(nextIdx)) { 
     N.indexCol = colIdx; 
     N.indexRow = rowIdx; 
     N.notSame = true;   
    } else { 
     N.indexCol = colIdx; 
     N.indexRow = rowIdx; 
     N.notSame = false;  
    } 
} 
関連する問題