2017-11-11 3 views
0

から見上げます。形式は次のとおりです(name:String、value:Int)。下記の内容例:スパークは、私は、スパークプロジェクトをやっているとニーズが最善の方法で、以下の問題を解決する方法について助言小さなファイル

Davi,130 
Joel,20 
Emma,500 

私はこのようなレコードの4行で別の小さなファイル、(クラス名:文字列、のminValue:INT、maxValueの:INT)持っている今、私はクラスを検索してファイルを作成 が必要名前最小値と最大値、以下上記のレコードの出力間の値に基づいて:

First,500,9999999 
Second,100,499 
Third,0,99 
Unknown,-99999,0 

私はMainDF内の各値のため、この小さなファイルを検索し、からの値の範囲に基づいて、クラス名を追加する必要があります小さいファイル。例:

Davi,130,Second 
Joel,20,Third 
Emma,500,First 

これは私が書いたコードです:私はここにUDFを書くために持っていると思う

//Main Data read, millions of records 
val MainData = sc.textFile("/mainfile.csv") 
case class MainType(Name:String,value:Int) 
val MainDF = MainData .map(line => line.split(",")).map(e =>MainType(e(0),e(1).toInt))).toDF 
MainDF.registerTempTable("MainTable") 
val refData = sc.broadast(sc.textFile("/refdata.csv")) 
case class refDataType (className:String,minValue:Int,maxValue:Int) 
//ref data, just 4 records 
val refRDD = refData.map(line=> line.split(",")).map(e => refDataType (e(0) , e(1).toInt, e(2).toInt)) 

が、私はUDFでデータフレームを使用する方法を知らない、またはでこれを行うにはどのような方法がありますspark scala

+0

これらの4つの条件が常に同じになる場合は、そのファイルからデータフレームを作成する代わりにifelse ifelse UDFを書くことができます。 – philantrovert

+0

@philantrovert見ていただきありがとうございます。 – user3124284

+0

ここにUDFを使用する以外に、他の方法はありますか? – user3124284

答えて

1

textFileを使用してRDDとしてファイルを読むことができます。ファイルは非常に小さく(必要に応じてブロードキャストする可能性があるため)、収集することができます。

RDDを収集してアレイを取得したら、Rangeを作成してUDFを作成し、値がその範囲内にあるかどうかを確認できます。

val rdd = sc.parallelize(Array(
("First",500,9999999), 
("Second",100,499), 
("Third",0,99), 
("Unknown",-99999,0) 
)) 

val dataArr = rdd.map{ case (className, min, max) => 
         (className, Range(min, max)) }.collect 
// First Element will be the Class Name 
// Second will be the Range(min, max) 
// sc.broadcast(dataArr) here 

val getClassName = udf {(x: Int) => { 
        dataArr.map{ e => 
         if (e._2.contains(x)) e._1.toString 
         else null.asInstanceOf[String] } 
        .filter(_ != null) 
        .apply(0) }} 

df.withColumn("ClassName", getClassName($"VALUE")).show 
+----+-----+---------+ 
|NAME|VALUE|ClassName| 
+----+-----+---------+ 
|Davi| 130| Second| 
|Joel| 20| Third| 
|Emma| 500| First| 
+----+-----+---------+ 

私は確かに良い解決策があるかもしれません。

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 
val mainSchema = StructType(Seq(StructField("name", StringType, false), 
StructField("value", IntegerType, false))) 
val mainDf = spark.read.schema(mainSchema).csv("/tmp/b.txt") 
val lookupSchema = StructType(Seq(StructField("class_name", StringType, false), StructField("min_value", IntegerType, false), 
StructField("max_value", IntegerType, false))) 
val lookupDf = spark.read.schema(lookupSchema).csv("/tmp/a.txt") 
val result = mainDf.join(lookupDf, $"value" <= $"max_value" && $"value" > $"min_value") 
result.show() 

私が最もパフォーマンスの方法はこの1つまたは1つであるかどうかを確認していない:

1

ここで最も簡単な方法は、ファイルのように、csvデータソースを使用し、標準SparkSQLを使用してそれらを結合し、両方を読むことです@philantrovertによって提案されています(これは使用しているSparkのバージョンによっても異なる場合があります)。あなたは両方を試して、自分で決めるべきです。

関連する問題