2016-11-29 17 views
2

以下のpysparkで書き込んだカウンタが、常に正しい結果を提供するとは限りません。グローバルカウンタに関連していますか?pysparkのグローバルカウンタ

def increment_counter(): 
    global counter 
    counter += 1 

def get_number_of_element(rdd): 
    global counter 
    counter = 0 
    rdd.foreach(lambda x:increment_counter()) 
    return counter 

答えて

4

グローバル変数はドライバノードでのみ定義されています。つまり、ローカルホスト上で実行するまで正常に動作します。 ジョブを複数のプロセスに配布するとすぐに、counter変数にアクセスすることはなく、独自のプロセスで新しいプロセスを作成します。したがって、最終結果にはドライバプロセスで行われた増分のみが含まれます。

あなたが探しているのは非常に一般的な使い方ですが、Sparkのアキュムレータ機能の対象です。アキュムレータはプロセスの最後に配布され収集されるため、合計にはドライバノードだけでなくすべてのノードのインクリメントが含まれます。

Accumulators - Spark Programming Guide

+0

グレート!どうもありがとうございます! – xxx222