2017-11-25 1 views
1

私は、この形態のスパークデータフレームを有する:新しい各要素が別の列のgroupbyに依存するSpark DataFrameに列を追加する方法はありますか?

+----------+-----+-------+-------+ 
| timestamp| lat| lon|user_id| 
+----------+-----+-------+-------+ 
|1511512345|34.12|-120.12|  1| 
|1511512348|34.13|-120.13|  1| 
|1511512349|34.14|-120.14|  1| 
|1511551234|31.11|-122.01|  2| 
|1511551236|31.15|-122.03|  2| 
+----------+-----+-------+-------+ 

およびIは、ユーザごとの位置データの時系列に計算を実行する必要があります。計算は、マップマッチングのための緯度/経度データの全時系列を有することを必要とする(すなわち、GPS位置の道路地図への最適なマッピングを見つけることを必要とする)。結果は、私はその後、データフレームに、私は一緒にこの計算を実行するために、各user_idための全体の時系列を必要とする

| timestamp| lat| lon|user_id|road_id| 
+----------+-----+-------+-------+-------+ 
|1511512345|34.12|-120.12|  1|  12| 
|1511512348|34.13|-120.13|  1|  12| 
|1511512349|34.14|-120.14|  1| 345| 
|1511551234|31.11|-122.01|  2| 737| 
|1511551236|31.15|-122.03|  2| 643| 
+----------+-----+-------+-------+-------+ 

注意を追加したいroad_id Sのシリーズです(つまり、計算はすることはできません行ごとに実行されますが、各グループごとにグループ全体が必要ですuser_id)これはspark dataframe APIを使ってどのように行うことができますか?私はこれを達成するためにgroupbywithColumnまたは他の方法を使用することが可能かどうかはわかりません。

df.sortby('timestamp').groupby('user_id').agg(...) ? 

road_id配列は、典型的には、HMMモデルを用いて計算され、道路網全体緯度/経度配列の機能(Matching GPS traces to a mapに記載のように)です。

基本的には、マップマッチング機能への入力は、緯度/経度値の配列全体となり、出力はあなたが生成するGROUPBYを行う必要があり、同じ長さ

答えて

0

のroad_id値の列になります新しいDataFrameを作成したら、この新しいDataFrameを元のDataFrameに結合します。

0

私はScalaを使用しています(YMMVにはとタグ付けされています)。


私の理解では、あなたがuser_idあたりのデータセットとlat/lonシーケンス全体ですべてのレコードの値を計算したいということです。

それは私にウィンドウの集約の問題と思われます。

ウィンドウ仕様を定義しましょう(再び私はYMMVのようにScalaを使用しています)。

val input = Seq(
    ("1511512345", 34.12, -120.12, 1)) 
    .toDF("timestamp", "lat", "lon", "user_id") 

import org.apache.spark.sql.expressions.Window 
val byUserId = Window.partitionBy("user_id").orderBy("timestamp") 

val inputWithLatsAndLonsCols = input 
    .withColumn("lats", collect_list("lat") over byUserId) 
    .withColumn("lons", collect_list("lon") over byUserId) 
scala> inputWithLatsAndLonsCols.show 
+----------+-----+-------+-------+-------+---------+ 
| timestamp| lat| lon|user_id| lats|  lons| 
+----------+-----+-------+-------+-------+---------+ 
|1511512345|34.12|-120.12|  1|[34.12]|[-120.12]| 
+----------+-----+-------+-------+-------+---------+ 

// define UDF to do the calculation 
// NOTE that the UDF always returns 1 for demo purposes 
val roadId = udf { (lats: Seq[Double], lons: Seq[Double]) => 1 } 

val roads = inputWithLatsAndLonsCols.withColumn("road_id", roadId($"lats", $"lons")) 
scala> roads.show 
+----------+-----+-------+-------+-------+---------+-------+ 
| timestamp| lat| lon|user_id| lats|  lons|road_id| 
+----------+-----+-------+-------+-------+---------+-------+ 
|1511512345|34.12|-120.12|  1|[34.12]|[-120.12]|  1| 
+----------+-----+-------+-------+-------+---------+-------+ 
関連する問題