2016-10-02 5 views
1

2つのデータセットがあり、各データセットには2つの要素があります。 以下は例です。スカラーのキーで2つのデータセットを結合する方法

のData1:(名前、動物)

('abc,def', 'monkey(1)') 
('df,gh', 'zebra') 
... 

Data2の:(名前、果物)

('a,efg', 'apple') 
('abc,def', 'banana(1)') 
... 

結果予想:(名前、動物、果物)

('abc,def', 'monkey(1)', 'banana(1)') 
... 

I最初の列 'name'を使用してこれら2つのデータセットを結合します。私はこれを数時間かけて試みましたが、私は理解できませんでした。誰でも助けてくれますか?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 
val text1 = sc.textFile(args(0)) 
val text2 = sc.textFile(args(1)) 

val joined = text1.join(text2) 

上記のコードは機能しません。

+0

を結果をチェックしてみましょう// 2 RDDS

val joined = kvRdd1.join(kvRdd2) 

に参加するには方法? – maasg

+0

どのようなエラーが表示されますか?それはあなたに何を伝えますか? – maasg

+0

@maasg「記号結合を解決できません。 – tobby

答えて

1

join Scalaのシェルからの結果が対のRDDS上に定義されている、すなわち、タイプのRDDSありますRDD[(K,V)]。 必要な最初のステップは、入力データを正しいタイプに変換することです。

我々は最初の(Key, Value)のペアにタイプStringの元データを変換する必要があります。

val parse:String => (String, String) = s => { 
    val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r 
    s match { 
    case regex(k,v) => (k,v) 
    case _ => ("","") 
    } 
} 

(キーにカンマが含まれているため、我々は、単純なsplit(",")式を使用できないことに注意してください)

その後この関数を使用してテキスト入力データを解析します。

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')") 
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')") 

val rdd1 = sparkContext.parallelize(s1) 
val rdd2 = sparkContext.parallelize(s2) 

val kvRdd1 = rdd1.map(parse) 
val kvRdd2 = rdd2.map(parse) 

最後に、さんは

joined.collect 

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1)))) 

あなたは `(キー、値)`タプルに入力されたテキストを分割している
+0

ありがとうございました! – tobby

+0

もう1つ質問があります。データに一重引用符を保持するにはどうすればよいですか? – tobby

+1

@tobby引用符を保持するために正規表現を変更してください。 – maasg

0

まずデータセットに対してpairRDDを作成してから、結合変換を適用する必要があります。あなたのデータセットは正確ではありません。

以下の例を考慮してください。コードがスカラ

ここ
val pairRDD1 = sc.textFile("/path_to_yourfile/first.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val pairRDD2 = sc.textFile("/path_to_yourfile/second.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val joinRDD = pairRDD1.join(pairRDD2) 

    joinRDD.collect 

に以下のようでなければならない

**Dataset1** 

a 1 
b 2 
c 3 

**Dataset2** 

a 8 
b 4 

res10: Array[(String, (String, String))] = Array((a,(1,8)), (b,(2,4))) 
関連する問題