私はRDD [文字列] DeviceLogがあるDeviceLogのRDDに解析することが可能と仮定します:
case class DeviceLog(val id: String, val timestamp: Long, val onoff: Int)
DeviceLogクラスはかなり単純です。
// initialize contexts
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
これらは、データフレームに使用するsparkコンテキストとSQLコンテキストを初期化します。
ステップ1:
val input = List(
DeviceLog("A",1335952933,1),
DeviceLog("A",1335953754,0),
DeviceLog("A",1335994294,1),
DeviceLog("A",1335995228,0),
DeviceLog("B",1336001513,1),
DeviceLog("B",1336002622,0),
DeviceLog("B",1336006905,1),
DeviceLog("B",1336007462,0))
val df = input.toDF()
df.show()
+---+----------+-----+
| id| timestamp|onoff|
+---+----------+-----+
| A|1335952933| 1|
| A|1335953754| 0|
| A|1335994294| 1|
| A|1335995228| 0|
| B|1336001513| 1|
| B|1336002622| 0|
| B|1336006905| 1|
| B|1336007462| 0|
+---+----------+-----+
ステップ2:デバイスID、パーティション、タイムスタンプによって順序及び(オン/オフ)ペア情報を保持
val wSpec = Window.partitionBy("id").orderBy("timestamp")
val df1 = df
.withColumn("spend", lag("timestamp", 1).over(wSpec))
.withColumn("one", lag("onoff", 1).over(wSpec))
.where($"spend" isNotNull)
df1.show()
+---+----------+-----+----------+---+
| id| timestamp|onoff| spend|one|
+---+----------+-----+----------+---+
| A|1335953754| 0|1335952933| 1|
| A|1335994294| 1|1335953754| 0|
| A|1335995228| 0|1335994294| 1|
| B|1336002622| 0|1336001513| 1|
| B|1336006905| 1|1336002622| 0|
| B|1336007462| 0|1336006905| 1|
+---+----------+-----+----------+---+
ステップ3:計算稼働時間とフィルタ基準によって
val df2 = df1
.withColumn("upTime", $"timestamp" - $"spend")
.withColumn("criteria", $"one" - $"onoff")
.where($"criteria" === 1)
df2.show()
| id| timestamp|onoff| spend|one|upTime|criteria|
+---+----------+-----+----------+---+------+--------+
| A|1335953754| 0|1335952933| 1| 821| 1|
| A|1335995228| 0|1335994294| 1| 934| 1|
| B|1336002622| 0|1336001513| 1| 1109| 1|
| B|1336007462| 0|1336006905| 1| 557| 1|
+---+----------+-----+----------+---+------+--------+
ステップ4:グループidとsum
val df3 = df2.groupBy($"id").agg(sum("upTime"))
df3.show()
+---+-----------+
| id|sum(upTime)|
+---+-----------+
| A| 1755|
| B| 1666|
+---+-----------+
ありがとうございましたありがとうございましたデータフレームなしでこのrdd操作のみを行う方法はありますか? –
私はそれができると思いますが、データフレームはこの種の問題に柔軟性をもたらします。また、データフレームは、rddよりも処理の点でより高速に見えます。 – dumitru