2016-04-19 15 views
0

下記のように3フィールドのrddがあります。1フィールドrddの値の選択方法rddの第2フィールドにある場合のみ

1,2,6 
2,4,6 
1,4,9 
3,4,7 
2,3,8 

今、上記のrddから、私はrddにしたがいます。

2,4,6 
3,4,7 
2,3,8 

結果rddには1から始まる行がありません.1は入力rddの2番目のフィールドにはありません。

+0

あなたは番目を提供することができますeフルタイプの入出力RDDを作成し、データをどのようにフィルタリングして変換するかについての規則を詳述します。 – Aivean

+0

フィールドとフィールド2は文字列、フィールド3は整数です。 Field1の値がField2で使用可能な出力の行だけが必要です。上記の例では、2と3はrddのField2にありますが、1はField2にありません。 – Ahmad

+0

あなたの質問をより良い説明やより良い例で更新する必要があります。 – Vishnu667

答えて

3

[OK]を、私はあなたが何をしたいのかを正しく理解している場合、2つの方法があります。

  1. ありRDDがあるあなたのRDD最初RDD「は、第2フィールド」と第二の一意の値が含まれている2に分割「第1の値」をキーとする。その後、rddsを一緒に結合します。このアプローチの欠点は、distinctjoinが動作が遅いことです。

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
        ("1", "2", 6), 
        ("2", "4", 6), 
        ("1", "4", 9), 
        ("3", "4", 7), 
        ("2", "3", 8) 
    )) 
    
    val uniqueValues: RDD[(String, Unit)] = r.map(x => x._2 ->()).distinct 
    val r1: RDD[(String, (String, String, Int))] = r.map(x => x._1 -> x) 
    
    val result: RDD[(String, String, Int)] = r1.join(uniqueValues).map {case (_, (x, _)) => x} 
    
    result.collect.foreach(println) 
    
  2. あなたのRDDは比較的小さく、二値のSetはその後、その後、あなたはメモリ内の最初のステップとして設定されているを作成することができ、すべてのノードでメモリに完全に収まるすべてのノードにブロードキャストすることができた場合ちょうどあなたのRDDをフィルタリング:

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
        ("1", "2", 6), 
        ("2", "4", 6), 
        ("1", "4", 9), 
        ("3", "4", 7), 
        ("2", "3", 8) 
    )) 
    
    val uniqueValues = sc.broadcast(r.map(x => x._2).distinct.collect.toSet) 
    
    val result: RDD[(String, String, Int)] = r.filter(x => uniqueValues.value.contains(x._1)) 
    
    result.collect.foreach(println) 
    

どちらの例の出力を:

(2,4,6) 
(2,3,8) 
(3,4,7) 
関連する問題