2016-09-29 2 views
1

私はデータフレームdf = [ID:String、値:int、型:String、タイムスタンプ:java.sql.Date]を持っています。私は結果が必要として:Scalaのバインディング属性tree:dayofmonth(キャスト(日付としてタイムスタンプ#122))

マイデータフレーム:私が試し

id : 1 
totalValue : 72 
typeForDay : {"rent: 2, "buy" : 2 } --- group By based on id and dayofmonth(col("timestamp")) atmost 1 type per day 
として、私は結果が必要

+----+-------++-------+------------------------+ 
| id | type | value | timestamp    | 
+----+-------+--------+------------------------+ 
| 1 | rent | 12 | 2016-09-19T00:00:00Z 
| 1 | rent | 12 | 2016-09-19T00:00:00Z 
| 1 | buy | 12 | 2016-09-20T00:00:00Z 
| 1 | rent | 12 | 2016-09-20T00:00:00Z 
| 1 | buy | 12 | 2016-09-18T00:00:00Z 
| 1 | buy | 12 | 2016-09-18T00:00:00Z 
+----+-------+-------+------------------------+ 

val ddf = df. 
.groupBy("id",) 
.agg(collect_set("type"), 
sum("value") as "totalValue") 

val count_by_value = udf {(gti :scala.collection.mutable.WrappedArray[String]) => if (gti == null) null else gti.groupBy(identity).mapValues(_.size)} 


val result = ddf.withColumn("totalValue", count_by_value($"collect_list(type)")) 
.drop("collect_list(type)") 

これは私にエラーを与える:(...それはコンパイルするために、いくつかの修正後)

あなたのコードを実行
org.apache.spark.SparkException: Job aborted due to stage failure: Task 115 in stage 15.0 failed 4 times, most recent failure: Lost task 115.3 in stage 15.0 (TID 1357, ip-172-31-9-47.ec2.internal): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: dayofmonth(cast(timestamp#122 as date))#137 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86) 
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233) 
    at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.<init>(Projection.scala:62) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:234) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:234) 
    at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:197) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:209) 
    at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Couldn't find dayofmonth(cast(timestamp#122 as date))#137 in [customerId#81,timestamp#122,benefit#111] 
    at scala.sys.package$.error(package.scala:27) 
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:92) 
    at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:86) 
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 34 more 

答えて

1

(スパーク1.6.2を使用して)あなたは私のenvに記述する例外を発生しません、タイプ1日の数をddfにカウントすることはできません.ddfはすでにidでグループ化されており、タイムスタンプデータは失われています。

ここでは、単一の地図にMapType列の値を「マージ」するUDAF(ユーザー定義集計関数)を使用して、代替実装です:

val toMap = udf { (typ: String, count: Int) => Map(typ -> count) } 

val result = df 
    // First: group by id AND type, count distinct days and sum value: 
    .groupBy("id", "type").agg(countDistinct(dayofmonth(col("timestamp"))) as "daysPerType", sum("value") as "valPerType") 
    // Then: convert type and count into a single Map column 
    .withColumn("typeForDay", toMap(col("type"), col("daysPerType"))) 
    // Lastly: use a custom aggregation function to "merge" the maps (assuming keys are unique to begin with!) 
    .groupBy("id").agg(sum("valPerType") as "totalValue", CombineMaps(col("typeForDay")) as "typeForDay") 

result.show() 
// prints: 
// +---+----------+------------------------+ 
// | id|totalValue|    typeForDay| 
// +---+----------+------------------------+ 
// | 1|  72|Map(buy -> 2, rent -> 2)| 
// +---+----------+------------------------+ 

そしてCombineMapsの実装:

object CombineMaps extends UserDefinedAggregateFunction { 
    override def inputSchema: StructType = new StructType().add("map", dataType) 
    override def bufferSchema: StructType = inputSchema 
    override def dataType: DataType = MapType(StringType, IntegerType) 
    override def deterministic: Boolean = true 

    override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0 , Map[String, Int]()) 

    // naive implementation - assuming keys won't repeat, otherwise later value for key overrides earlier one 
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val before = buffer.getAs[Map[String, Int]](0) 
    val toAdd = input.getAs[Map[String, Int]](0) 
    val result = before ++ toAdd 
    buffer.update(0, result) 
    } 

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2) 

    override def evaluate(buffer: Row): Any = buffer.getAs[Map[String, Int]](0) 
} 
+0

すごいよ!明確な説明をありがとう:) – Learner

+0

私はあなたのアイデアに従いましたが、1つの疑問がありました。タイプに(文字列値を使用して)類似した別の列がありますが、idだけでグループ化されます。私はそれを別に行う必要がありますか? – Learner

関連する問題