2017-01-07 9 views
2

私はスパークに新しいですし、私はそのようなデータをcsvファイルを持っている:pysparkサブストリングと集約

date,   accidents, injured 
2015/20/03 18:00 15,   5 
2015/20/03 18:30 25,   4 
2015/20/03 21:10 14,   7 
2015/20/02 21:00 15,   6 

が、私はそれが起こったときの特定の時間で、このデータを集計したいと思います。私の考えは、「年/月/日hh」に日付を文字列で区切って分をつけることで、私はそれをキーにすることができます。私は平均して事故を起こし、1時間ごとに負傷したかった。多分、幽霊とは違った、よりスマートな方法がありますか?

ありがとうございました!

答えて

4

これは、後で何をするつもりかによって決まります。

あなたが提案として行うことであろう最も簡単な方法:日付文字列をサブストリング、その後集計:

data = [('2015/20/03 18:00', 15, 5), 
    ('2015/20/03 18:30', 25, 4), 
    ('2015/20/03 21:10', 14, 7), 
    ('2015/20/02 21:00', 15, 6)] 
df = spark.createDataFrame(data, ['date', 'accidents', 'injured']) 

df.withColumn('date_hr', 
       df['date'].substr(1, 13) 
    ).groupby('date_hr')\ 
     .agg({'accidents': 'avg', 'injured': 'avg'})\ 
     .show() 

あなたは、しかし、後にいくつかのより多くの計算をしたい場合は、あなたがデータを解析することができますa TimestampType()とし、その日時を抽出します。

import pyspark.sql.types as typ 
from pyspark.sql.functions import col, udf 
from datetime import datetime 

parseString = udf(lambda x: datetime.strptime(x, '%Y/%d/%m %H:%M'), typ.TimestampType()) 
getDate = udf(lambda x: x.date(), typ.DateType()) 
getHour = udf(lambda x: int(x.hour), typ.IntegerType()) 

df.withColumn('date_parsed', parseString(col('date'))) \ 
    .withColumn('date_only', getDate(col('date_parsed'))) \ 
    .withColumn('hour', getHour(col('date_parsed'))) \ 
    .groupby('date_only', 'hour') \ 
    .agg({'accidents': 'avg', 'injured': 'avg'})\ 
    .show() 
+0

は、y [0] [:13]を使って簡単にマッピングすると部分文字列に管理されます。あなたのソリューションはよりエレガントに見えます。ありがとうございました!もう1つの質問があります。他のデータを持つ別のファイルがある場合、別の年からその事故や怪我の平均をどのように取ることができますか?すべてを1つのファイルに入れてから計算を実行しますか? – sampak

+0

私はそのファイルを読んでそのデータだけで集計するか、必要に応じて一度に結果を出すようにします(そして、あなたがSpark 2.0で作業していると仮定して).union(...) 2つ(またはそれ以上)の「DataFrames」を一緒にします。 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union – TDrabas