2016-08-12 13 views
1

私はこの質問に相当するPysparkを探しています:How to get the number of elements in partition?各pyspark RDDパーティションの要素の数をカウント

具体的には、Pyspark RDDまたはデータフレームの各パーティションの要素数をプログラムでカウントしたいと思います(この情報はSpark Web UIで利用できます)。

「はAttributeError: 『NoneType』オブジェクトが属性 『_jvm』を持っていない」:この試行結果

df.foreachPartition(lambda iter: sum(1 for _ in iter))

私はメモリにイテレータの内容を収集する必要はありません。

答えて

4

質問している場合:イテレータを反復処理せずにイテレータの要素数を取得できますか?答えはNoです。

しかし、我々はあなたが言及した記事のように、メモリ内に格納する必要はありません:あなたのコードだけでは、解決に非常に近い

def count_in_a_partition(idx, iterator): 
    count = 0 
    for _ in iterator: 
    count += 1 
    return idx, count 

data = sc.parallelize([ 
    1, 2, 3, 4 
], 4) 

data.mapPartitionsWithIndex(count_in_a_partition).collect() 

EDIT

注意をmapPartitionsはイテレータを返す必要があります。

def count_in_a_partition(iterator): 
    yield sum(1 for _ in iterator) 

data.mapPartitions(count_in_a_partition).collect() 
+0

ありがとうございます@ShuaiYuan。いいえ、私はカウントを得るために繰り返す必要があることを知っています。 あなたの最初の解決策が私に役立ちます! しかし、Spark 1.5.0(私の組織のクラスタ)の私の元の試みと同じAttributeErrorを、あなたの例で作成した "データ" rddに対してもスローします。 AttributeError: 'NoneType'オブジェクトに '_jvm'属性がありません。しかし、1.6.0または1.5.2を実行しているSpark Community Editionでは、両方のソリューションが動作します。おそらく私のローカルCDHディストリビューションについて奇妙な何か? –

+0

することができます。残念ながら私はSpark 1.5.0をテストしていません。 – ShuaiYuan

関連する問題