2016-10-14 7 views
1

スパークデータフレームを計算するためのより効率的な方法:私はと類似の販売データフレーム持って

id | date    | amount 
-----|-------------------|------- 
1 |2016-03-04 12:03:00|10.40 
1 |2016-03-04 12:05:10|5.0 
1 |2016-03-04 12:15:50|11.30 
1 |2016-03-04 12:16:00|9.40 
1 |2016-03-04 12:30:00|10.0 
1 |2016-03-04 12:40:00|5.40 

そして、私は10分の時間枠で時間によってグループにしようと量を合計して作成していますが似たデータフレーム:

date    | amount 
-----------------|------- 
2016-03-04 12:00 |0.0 
2016-03-04 12:10 |15.40 
2016-03-04 12:20 |20.70 
2016-03-04 12:30 |10.0 
2016-03-04 12:40 |5.40 

私は、リストに追加し、リストとデータフレームを作成するよりも、データフレーム、グループとの和をフィルタリングし、ループに日時変数を試してみました。このコードは、10分の時間枠内で2ヶ月の売上高を計算するために20分ほどかかることがあります214626個の行を含むファイルで

bar_list = [] 
while date_loop < final_date: 
    start_time = date_loop - datetime.timedelta(minutes=10) 
    end_time = date_loop - datetime.timedelta(seconds=1) 
    df_range = (df_sale 
       .filter((df_sale.date >= start_time) & (df_sale.date <= end_time)) 
       .groupby() 
       .sum('amount')) 
    bar_list.append((date_loop,df_range.head()['sum(amount)'])) 
    date_loop += datetime.timedelta(minutes=10) 

fields = ['date','amount'] 
df = sqlContext.createDataFrame(bar_list,fields).na.fill(0) 

これを行うより効率的な方法はありますか?私は労働者の間で変数を共有できることを理解しています。リストを共有できますか?私のボトルネックをリストに追加していますか?

ありがとうございます。

答えて

0

あなたは文字列として処理したい場合、これは少し汚れすることができ、あなたはこれを試すことができます。これはかかった何時間

def getDTClosestMin(s:String):String = { 
s.substring(0,4)+"-"+s.substring(5,7)+"-"+s.substring(8,10)+" " + 
s.substring(11,13)+":" + 
((((s.substring(14,16)).toInt)*0.1).ceil)*10).round.toString.padTo(2,"0").mkString } 


timeAmtRDD.map(x=> x._1+","+x._2+","+x._3) 
    .map(x=>x.split(",")) 
    .map(x=> (getDTClosestMin(x(1)), x(2).toFloat)) 
    .reduceByKey(_+_) 
    .sortByKey().toDF("date", "amount").show() 


Output: 
+----------------+------+ 
|   date|amount| 
+----------------+------+ 
|2016-03-04 12:10| 15.4| 
|2016-03-04 12:20| 20.7| 
|2016-03-04 12:30| 10.0| 
|2016-03-04 12:40| 5.4| 
+----------------+------+ 

アップデートを..;)

関連する問題