私は、この形態のスパークデータフレームを有する:新しい各要素が別の列のgroupbyに依存するSpark DataFrameに列を追加する方法はありますか?
+----------+-----+-------+-------+
| timestamp| lat| lon|user_id|
+----------+-----+-------+-------+
|1511512345|34.12|-120.12| 1|
|1511512348|34.13|-120.13| 1|
|1511512349|34.14|-120.14| 1|
|1511551234|31.11|-122.01| 2|
|1511551236|31.15|-122.03| 2|
+----------+-----+-------+-------+
およびIは、ユーザごとの位置データの時系列に計算を実行する必要があります。計算は、マップマッチングのための緯度/経度データの全時系列を有することを必要とする(すなわち、GPS位置の道路地図への最適なマッピングを見つけることを必要とする)。結果は、私はその後、データフレームに、私は一緒にこの計算を実行するために、各user_id
ための全体の時系列を必要とする
| timestamp| lat| lon|user_id|road_id|
+----------+-----+-------+-------+-------+
|1511512345|34.12|-120.12| 1| 12|
|1511512348|34.13|-120.13| 1| 12|
|1511512349|34.14|-120.14| 1| 345|
|1511551234|31.11|-122.01| 2| 737|
|1511551236|31.15|-122.03| 2| 643|
+----------+-----+-------+-------+-------+
注意を追加したいroad_id
Sのシリーズです(つまり、計算はすることはできません行ごとに実行されますが、各グループごとにグループ全体が必要ですuser_id
)これはspark dataframe APIを使ってどのように行うことができますか?私はこれを達成するためにgroupby
とwithColumn
または他の方法を使用することが可能かどうかはわかりません。
df.sortby('timestamp').groupby('user_id').agg(...) ?
road_id
配列は、典型的には、HMMモデルを用いて計算され、道路網全体緯度/経度配列の機能(Matching GPS traces to a mapに記載のように)です。
基本的には、マップマッチング機能への入力は、緯度/経度値の配列全体となり、出力はあなたが生成するGROUPBYを行う必要があり、同じ長さ