0

scikit-learnクラスKNeighborsClassifierを拡張しようとしています。隣接するノード間の距離を計算する別の方法を導入しています(興味があればhereを参照)。並列実装がシリアルよりも遅いのはなぜですか? (Pythonマルチプロセッシングモジュール)

並列スキームは以下の通りである:我々は集合Aのすべての要素間の距離を計算し、A(次々順次いずれかを撮影したもの)の各要素のために、Bを設定し、すべての要素までの距離を計算することを考える B と並行して。 時間のかかる操作は、2つの要素間の距離を計算するため、各プロセスはこの基本的な操作を実行する必要があります。

問題は、マシンと使用されているコアの数にかかわらず、同期呼び出しと非同期呼び出しの両方を使用すると、シリアル実行(Pythonのmultiprocessingモジュールを使用)よりもはるかに低速です。

これは、バックグラウンドで通信される共有変数の使用に関係すると考えられます。問題は、どの変数が伝達されているのか、どのように回避できるのでしょうか?

コード:すべてのための

class WordMoversKNN(KNeighborsClassifier): 
"""K nearest neighbors classifier using the Word Mover's Distance. 
Parameters 
---------- 

W_embed : array, shape: (vocab_size, embed_size) 
    Precomputed word embeddings between vocabulary items. 
    Row indices should correspond to the columns in the bag-of-words input. 
n_neighbors : int 
    Number of neighbors to use by default for :meth:`k_neighbors` queries. 
n_jobs : int 
    The number of parallel jobs to run for Word Mover's Distance computation. 
    If ``-1``, then the number of jobs is set to the number of CPU cores. 
verbose : int, optional 
    Controls the verbosity; the higher, the more messages. Defaults to 0. 

""" 

def __init__(self, W_embed, n_neighbors=1, n_jobs=1, verbose=5): 
    self.W_embed = W_embed 
    self.verbose = verbose 
    if n_jobs == -1: 
     n_jobs = mp.cpu_count() 

    super(WordMoversKNN, self).__init__(n_neighbors=n_neighbors, n_jobs=n_jobs, metric='precomputed', algorithm='brute') 

def _wmd(self, i, row, X_train): 
    """Compute the WMD between training sample i and given test row. 

    Assumes that `row` and train samples are sparse BOW vectors summing to 1. 
    """ 
    union_idx = np.union1d(X_train[i].indices, row.indices) 
    W_minimal = self.W_embed[union_idx] 
    W_dist = euclidean_distances(W_minimal) 
    bow_i = X_train[i, union_idx].A.ravel() 
    bow_j = row[:, union_idx].A.ravel() 
    return emd(bow_i, bow_j, W_dist) 

def _wmd_row(self, row, X_train): 
    """Wrapper to compute the WMD of a row with all training samples. 

    Assumes that `row` and train samples are sparse BOW vectors summing to 1. 
    Useful for parallelization. 
    """ 
    n_samples_train = X_train.shape[0] 
    return [self._wmd(i, row, X_train) for i in range(n_samples_train)] 

def _pairwise_wmd(self, X_test, X_train=None, ordered=True): 
    """Computes the word mover's distance between all train and test points. 

    Parallelized over rows of X_test. 

    Assumes that train and test samples are sparse BOW vectors summing to 1. 

    Parameters 
    ---------- 
    X_test: scipy.sparse matrix, shape: (n_test_samples, vocab_size) 
     Test samples. 

    X_train: scipy.sparse matrix, shape: (n_train_samples, vocab_size) 
     Training samples. If `None`, uses the samples the estimator was fit with. 
    ordered: returns result keeping the order of the rows in dist (following X_test). 
     Otherwise, the rows of dist follow a potentially random order which does not follow the order 
     of indices in X_test. However, computation is faster in this case (asynchronous parallel execution) 

    Returns 
    ------- 
    dist : array, shape: (n_test_samples, n_train_samples) 
     Distances between all test samples and all train samples. 

    """ 
    n_samples_test = X_test.shape[0] 

    if X_train is None: X_train = self._fit_X 

    if (self.n_jobs == 1) or (n_samples_test < 2*self.n_jobs): # to avoid parallelism overhead for small test samples 
     dist = [ self._wmd_row(test_sample , X_train) for test_sample in X_test ] 
    else: 
     if self.verbose: 
      print("WordMoversKNN set to use {} parallel processes".format(self.n_jobs)) 
     if ordered: 
      dist = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(delayed(self._wmd_row) (test_sample, X_train) for test_sample in X_test) 
     else: # Asynchronous call is faster but returns results in random order    
      pool = mp.Pool(processes=self.n_jobs) 

      results = [pool.apply_async(self._wmd_row, args=(test_sample, X_train)) for test_sample in X_test] 
      dist = [p.get() for p in results] 
    return np.array(dist) 


def calculate(self, X): 
    """Predict the class labels for the provided data 
    Parameters 
    ---------- 
    X : scipy.sparse matrix, shape (n_test_samples, vocab_size) 
     Test samples. 
    Returns 
    ------- 
    y : array of shape [n_samples] 
     Class labels for each data sample. 
    """ 
    X = check_array(X, accept_sparse='csr', copy=True) 
    X = normalize(X, norm='l1', copy=False) 
    dist = self._pairwise_wmd(X) 
    # A matrix of distances given to predict in combination with metric = 'precomputed' 
    # means that no more distance calculations take place. Neighbors are found simply by sorting 
    return super(WordMoversKNN, self).predict(dist) 
+1

パラレルは、実際には遅い場合があります。 1つのプロセッサ(例えば、単一コアCPU)しかないので、処理は並列に処理することができない。並行して、コンテキストを切り替えるために必要な時間はわずかであり、処理されたシリアルの場合はこの時間は必要ありません。 – dee

+0

@deeありがとうございます。問題は、私がいくつかの(マルチコア)マシンで同じコードを実行し、異なる数のコアを使用しようとしたことです。しかし、実行は常にずっと遅いです。同時に、いくつかのおもちゃの例(配列の並べ替えや数学的関数を配列のリストに適用する)で同じモジュール(マルチプロセッシング)を試すと、スピードアップが見えます。 – Ataxias

+0

'IPCコスト(' test_sample'と 'X_train'を子プロセスにコピーし、' _wmd_row'から返されたリストを親に戻すことを意味する)が非常に高い場合、 'multiprocessing'はシリアル実行よりも遅くなる可能性があります。 IPCは非常に遅いため、非常に大きなオブジェクトの場合、計算を並列化することで得られる利益よりも優れています。非常に高価ではない計算を行う必要がある巨大なリストを持っているなら、 'multiprocessing'は役に立たないかもしれません。 – dano

答えて

0
、毎回完全 X_trainを渡す必要

主な問題は、行列X_test新しいプロセスの各行ためがスポーンしたこと、ならびに他の変数(例えば、self.X_embed)プロセス。これらの変数のピッキングとディスパッチは、そのサイズのために非常に時間がかかります。私はサイズX_test.shape[0]//n_jobsn_jobsチャンクで行列X_testを分割するとき は、私は、全体的なだけn_jobsプロセスを生成し、変数にn_jobs回の代わりX_test.shape[0]時間を渡すために持つ、驚異的なスピードアップを得ました。 しかし、通信する必要がある変数のサイズのため、このタイプの問題では、データの並列処理はタスクの並列処理よりもはるかに適切なアプローチであると私は考えています。したがって、mpi4pyを使用して、 self.W_embed,X_trainおよびX_test行列を作成し、計算結果のみを伝達します。

関連する問題