2017-03-01 5 views
1

デバイス、タイムスタンプ、オン/オフ形式に従ってRDD [String]を持っています。各デバイスがスワイプされる時間を計算するにはどうすればよいですか? ?手段1と、例えばスカラーの隣接要素をマップする方法

A,1335952933,1 
A,1335953754,0 
A,1335994294,1 
A,1335995228,0 
B,1336001513,1 
B,1336002622,0 
B,1336006905,1 
B,1336007462,0 

中間工程1

A,((1335953754 - 1335952933),(1335995228 - 1335994294)) 
B,((1336002622- 1336001513),(1336007462 - 1336006905)) 

中間工程2

(A,(821,934)) 
(B,(1109,557)) 

出力手段0

オフに

(A,1755) 
(B,1666) 

答えて

2

私はRDD [文字列] DeviceLogがあるDeviceLogのRDDに解析することが可能と仮定します:

case class DeviceLog(val id: String, val timestamp: Long, val onoff: Int) 

DeviceLogクラスはかなり単純です。

// initialize contexts 
val sc = new SparkContext(conf) 
val sqlContext = new HiveContext(sc) 

これらは、データフレームに使用するsparkコンテキストとSQLコンテキストを初期化します。

ステップ1

val input = List(
    DeviceLog("A",1335952933,1), 
    DeviceLog("A",1335953754,0), 
    DeviceLog("A",1335994294,1), 
    DeviceLog("A",1335995228,0), 
    DeviceLog("B",1336001513,1), 
    DeviceLog("B",1336002622,0), 
    DeviceLog("B",1336006905,1), 
    DeviceLog("B",1336007462,0)) 

val df = input.toDF() 
df.show() 
+---+----------+-----+ 
| id| timestamp|onoff| 
+---+----------+-----+ 
| A|1335952933| 1| 
| A|1335953754| 0| 
| A|1335994294| 1| 
| A|1335995228| 0| 
| B|1336001513| 1| 
| B|1336002622| 0| 
| B|1336006905| 1| 
| B|1336007462| 0| 
+---+----------+-----+ 

ステップ2:デバイスID、パーティション、タイムスタンプによって順序及び(オン/オフ)ペア情報を保持

val wSpec = Window.partitionBy("id").orderBy("timestamp") 

    val df1 = df 
     .withColumn("spend", lag("timestamp", 1).over(wSpec)) 
     .withColumn("one", lag("onoff", 1).over(wSpec)) 
     .where($"spend" isNotNull) 
    df1.show() 

+---+----------+-----+----------+---+ 
| id| timestamp|onoff|  spend|one| 
+---+----------+-----+----------+---+ 
| A|1335953754| 0|1335952933| 1| 
| A|1335994294| 1|1335953754| 0| 
| A|1335995228| 0|1335994294| 1| 
| B|1336002622| 0|1336001513| 1| 
| B|1336006905| 1|1336002622| 0| 
| B|1336007462| 0|1336006905| 1| 
+---+----------+-----+----------+---+ 

ステップ3:計算稼働時間とフィルタ基準によって

val df2 = df1 
     .withColumn("upTime", $"timestamp" - $"spend") 
     .withColumn("criteria", $"one" - $"onoff") 
     .where($"criteria" === 1) 
    df2.show() 

| id| timestamp|onoff|  spend|one|upTime|criteria| 
+---+----------+-----+----------+---+------+--------+ 
| A|1335953754| 0|1335952933| 1| 821|  1| 
| A|1335995228| 0|1335994294| 1| 934|  1| 
| B|1336002622| 0|1336001513| 1| 1109|  1| 
| B|1336007462| 0|1336006905| 1| 557|  1| 
+---+----------+-----+----------+---+------+--------+ 

ステップ4:グループidとsum

val df3 = df2.groupBy($"id").agg(sum("upTime")) 
    df3.show() 

+---+-----------+ 
| id|sum(upTime)| 
+---+-----------+ 
| A|  1755| 
| B|  1666| 
+---+-----------+ 
+0

ありがとうございましたありがとうございましたデータフレームなしでこのrdd操作のみを行う方法はありますか? –

+0

私はそれができると思いますが、データフレームはこの種の問題に柔軟性をもたらします。また、データフレームは、rddよりも処理の点でより高速に見えます。 – dumitru

関連する問題