2017-01-11 4 views
0

私は大きいRDD[(K, V)]を持っています。値をマップするには、各キーに共通の大きなデータ構造が必要であり、構築するのに費用がかかります。私はgroupByKeyを実行することはできませんし、後でflatMapを実行することはできません。各キーの値がメモリに収まらないからです。メモリに収まらないので、すべての構造体をロードできません。グループ(または最小回数)ごとに一度構造体の初期化を実行し、それを削除するにはどうすればよいですか?PairRDD、1回キーごとに変数を初期化する

ユースケース

  • 我々はRDD [文字列、文字列]を持っています。キーは、その言語の短いテキストである値の言語を示します。
  • 値のいくつかのトークンを分類する必要があります。そのためには、いくつかのトークンのカテゴリを含む各言語用のトライを作成する必要があります。
  • トライを構築するのは高価なので、foreach(K、V)のペアは作成できません。単一のトライがメモリに収まるが、すべての言語の試行を保つことはできない(異なるキーの数を考えると)。
  • したがって、トライを最小限の回数作成し、メモリ内にいくつかの数だけ保持する必要があります。 、

答えて

0

PLEはmapPartitions続いrepartitionAndSortWithinPartitionsRDDを使用します。

flatMapGroups続い groupBy
val partitioner: org.apache.spark.Partitioner = ??? 

rdd.repartitionAndSortWithinPartition(partitioner).mapPartitions { iter => { 
    var currentKey: Option[String] = None 
    var currentTrie: Option[Trie] = None 
    iter.map { 
    case (k, v) => 
     .. // if Option(k) != currentKey update currentKey and currentTrie 
     .. // Proceed with logic 
    } 
}} 

またはDataset

rdd.toDS.groupByKey(_._1).flatMapGroups { case (key, iter) => { 
    val currentTrie: Trie = ??? 
    iter.map { case (_, v) => ??? } 
}) 

そのRDD対応Datasetと違っサイズので、一度にメモリにすべての値をロードする必要はありません。個々のグループの問題は問題にならないはずです。

両方のソリューションでは完全シャッフルが必要ですが、各参照構造体はキーごとに1回のみ初期化されます。

0

はあなたのユースケースによっては、この大規模なデータ構造

val broadcastVar = sc.broadcast(LargeThingy()) 
broadcastVar.value 

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

のための放送変数を使用するか、それとも、rdd.foreachPartitionを使用して、パーティションごとに1例の大規模なブツを初期化することができますパーティション内のデータを処理します。

rdd.foreachPartition { case (data) => 
    val largeThing = LargeThing() 
    data.foreach { //etc. } 
} 
+0

私はブロードキャスト変数を見ていきます。 foreachPartitionを使用して、同じパーティションの同じキーの値をグループ化するにはどうすればよいですか?コードが機能するには、データのすべての値が同じキーに関係している必要があります。 – Miguel

+0

あなたの追加情報があれば、私の答えは役に立たない – ImDarrenG

関連する問題