2016-11-30 7 views
10

の一覧を表示するには、私はRDD構造スパーク:RDDは

RDD[(String, String)] 

を持っていると私は2つのリスト(RDDの各次元に1つ)を作成します。

私はrdd.foreach()を使用して2つのListBuffersを埋め込み、リストに変換しようとしましたが、繰り返し後にBufferListsが空であるため、各ノードは独自のListBufferを作成します。どうしたらいいですか?

EDIT:私のアプローチ

val labeled = data_labeled.map { line => 
    val parts = line.split(',') 
    (parts(5), parts(7)) 
}.cache() 

var testList : ListBuffer[String] = new ListBuffer() 

labeled.foreach(line => 
    testList += line._1 
) 
    val labeledList = testList.toList 
    println("rdd: " + labeled.count) 
    println("bufferList: " + testList.size) 
    println("list: " + labeledList.size) 

と結果は次のとおりです。

rdd: 31990654 
bufferList: 0 
list: 0 
+1

を何のコードで更新してくださいいくつかの入力データサンプルと期待される出力を試しました!あなたの質問は私にはあまり明確ではありません。 – eliasah

答えて

9

あなたが実際には2つのリスト作成したい場合は - 意味を、あなたはすべての分散データに収集することにしたいですドライバーアプリケーション(遅くなる可能性がありますまたは) - あなたはcollectを使用して、結果に簡単なmap操作を使用できます:

また
val list: List[(String, String)] = rdd.collect().toList 
val col1: List[String] = list.map(_._1) 
val col2: List[String] = list.map(_._2) 

- あなたは2 RDDSに「スプリット」あなたのRDDにしたい場合 - それは、データを収集せずにかなり似ています:

rdd.cache() // to make sure calculation of rdd is not repeated twice 
val rdd1: RDD[String] = rdd.map(_._1) 
val rdd2: RDD[String] = rdd.map(_._2) 

第3の選択肢は、これら2つのRDDSに最初のマップにあるとそれらのそれぞれを収集しますが、最初のオプションとあまり変わらず、同じリスクと制限があります。

+0

@Yuriyここでは、どのようにブロードキャスト変数(読み込み専用)がありますか?もっと詳しく説明できますか? – avr

+0

@avr ListBufferは可変であり、 '+ ='は内部状態を変更し、新しい参照を作成しません。しかし、あなたは疑問に思っています。不変なステートメント(何らかの操作のために参照を変更する)が何か(シリアライズ可能)でそれをラップする必要があります。 Listの簡単な例: 'val testList = sc.broadcast(新しいSerializable {var list = List.empty [String]})'、およびmutate内部状態の後。 – Yuriy

+0

@Yuriy私はavrが正しいと思っています、あなたは彼/彼女の質問を誤解しています - 変更可能でない不変なコレクションの問題ではありません - ブロードキャスト変数は、その値がエグゼキュータで変更された場合、この変更を確認してください(Sparkはすべてのエグゼクティブの変更をどのように集約しますか?)。ローカルモードで動作するという事実はバグのように見えますが、クラスタが実際に配布される場所では動作しません。 –

1

Tzachゾハルの答えに代わるものとして、あなたはリストにunzipを使用することができます。RDDの上

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d"))) 
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val (l1, l2) = myRDD.collect.toList.unzip 
l1: List[String] = List(a, c) 
l2: List[String] = List(b, d) 

それともkeysvalues

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values) 
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33 
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33 

scala> rdd1.foreach{println} 
a 
c 

scala> rdd2.foreach{println} 
d 
b 
関連する問題