2016-11-23 5 views
0

私はすでに同じパーティショナーを使用してパーティション化され、HDFSに格納されている2つのデータセットを持っています。これらのデータセットは、制御できない2つの異なるSparkジョブから出力されます。今、これらの2つのデータセットを結合して、異なる情報を生成することにします。SparkでHDFSの2つのデータファイルを結合しますか?

Example: 

Data Set 1: 
ORDER_ID CUSTOMER_ID ITEMS 
OD1  C1   1,2,3 -> partition 0 
OD2  C2   3,4,5 -> partition 0 
OD3  C4   1,2,3 -> partition 1 
OD4  C3   1,3  -> partition 1 

Data Set 1: 
ORDER_ID CUSTOMER_ID REFUND_ITEMS 
OD1  C1   1  -> partition 0 
OD2  C2   5  -> partition 0 
OD3  C4   2,3 -> partition 1 
OD4  C3   3  -> partition 1 

Options are: 

1) Create two RDDs from the datasets and join them. 
2) Create one RDD using one of the dataset. 
    -> For each partition in the RDD get the actual partition id i.e OD1 -> 0, OD3 -> 1 (using some custom logic) 
    -> Load data from HDFS for that partition for dataset 2 
    -> Iterate over both the dataset and produce combined result. 

For option 2 I don't know how to read a specific file form HDFS in the Spark executor. (I have the full URI for location of the file) 

答えて

0

2つのデータフレームを作成し、SQLを使用して結合できます。以下のコードを見つけてください。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 

// For implicit conversions from RDDs to DataFrames 
import spark.implicits._ 

case class struc_dataset(ORDER_ID: String,CUSTOMER_ID: String, ITEMS:String) 

//Read file1 
val File1DF = spark.sparkContext 
    .textFile("temp/src/file1.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0), attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset1 
File1DF.createOrReplaceTempView("Datset1") 

//Read file2 
val File2DF = spark.sparkContext 
    .textFile("temp/src/file2.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0),attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset2 
File2DF.createOrReplaceTempView("Datset2") 

// SQL statement to create final dataframe (JOIN) 
val finalDF = spark.sql("SELECT * FROM Dataset1 ds1 JOIN Dataset2 ds2 on ds1.ORDER_ID=ds2.ORDER_ID AND ds1.CUSTOMER_ID=ds2.CUSTOMER_ID") 

finalDF.show() 
関連する問題