2016-04-20 6 views
2

私はpysparkのDense行列(100 * 100)を持っていますが、repartitionを10個の行にそれぞれ10個のグループに分けたいと思います。pysparkの密行列を再分割する

from pyspark import SparkContext, SparkConf 
from pyspark.mllib import * 
sc = SparkContext("local", "Simple App") 
dm2 = Matrices.dense(100, 100, RandomRDDs.uniformRDD(sc, 10000).collect()) 
newRdd = sc.parallelize(dm2.toArray()) 
rerdd = newRdd.repartition(10) 

100個の要素を含むrerddに上記のコードをもたらします。私はこの行列dm2を行単位の分割ブロック(例えば、パーティション内に10行)として提示したいと思います。

+0

通常のnumpy行列を使用して試したところ、RDDに変換して分割しました。 http://stackoverflow.com/questions/36739687/partition-a-matrix-rdd-in-pyspark – Mitty

答えて

1

私はあまり意味がありませんが、たとえば、パーティションごとにパーティションと行のこの

mat = Matrices.dense(100, 100, np.arange(10000)) 

n_par = 10 
n_row = 100 

rdd = (sc 
    .parallelize(
     # Add indices 
     enumerate(
      # Extract and reshape values 
      mat.values.reshape(n_row, -1))) 
    # Partition and sort by row index 
    .repartitionAndSortWithinPartitions(n_par, lambda i: i // n_par)) 

確認番号のような何かを行うことができます:最初の行は、所望の含まれている場合

rdd.glom().map(len).collect() 
## [10, 10, 10, 10, 10, 10, 10, 10, 10, 10 

チェックデータ:

assert np.all(rdd.first()[1] == np.arange(100)) 
+0

'rdd.getNumPartitions()'が100を返します – Mitty

+0

私のためにうまく動作します。 – zero323