2016-12-09 4 views
-1

Datamameここでは、の節でサンプル操作を実行しようとしています。Dataframe where句を使用している間に例外が発生する

address  district 
hyderabad  001 
delhi   002 
mumbai   003 

は、今私はデータフレームを使用してアドレス、最大(地区)を評価する必要があります。

は、ここに私のサンプル表データです。

ムンバイ003

回避策

結果は次のようになりますこれは私がこれまで試したどのようなコードである

..ここ

SparkConf conf = new SparkConf(); 
     conf.set("spark.app.name", "max"); 
     conf.set("spark.master", "local"); 
     conf.set("spark.ui.port", "7077"); 

     SparkContext ctx=new SparkContext(conf);  
     SQLContext sqlContext = new SQLContext(ctx); 
     DataFrame df = sqlContext.read() 
      .format("com.databricks.spark.csv") 
      .option("inferSchema", "true") 
      .option("header", "true") 
      .load("/Users/hadoop/Downloads/SacramentocrimeJanuary2006.csv"); 
     //df.registerTempTable("consumer"); 
     //Row[] result = df.orderBy("cdatetime").select("cdatetime","address").collect(); 
     //DataFrame a = df.select("address","district").agg(functions.count("district"),functions.col("address")).orderBy("address"); 
     DataFrame b =df.select("address","district").where("district=max(district)"); 
     b.show(); 
     } 

は私です例外:

Cannot evaluate expression: (max(input[1, IntegerType]),mode=Complete,isDistinct=false) 
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233) 
    at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.genCode(interfaces.scala:73) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:419) 
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:401) 
    at org.apache.spark.sql.catalyst.expressions.EqualTo.genCode(predicates.scala:379) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:42) 
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:33) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:632) 
    at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:242) 
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:71) 
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:70) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/12/09 10:50:57 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) 
+0

あなたはどこを行うことはできません(「地区= MAX(地区)」)、この式は、列スティングに必要な、それは価値にあなたが行きます.IF実行しようとしている道を割り当てることはできませんpersonDs.filter($ "age"> 15) peopleDs.where($ "age"> 15)、peopleDs.where( "age> 15")。値を割り当てることはできません。 –

答えて

0

あなたproblem.likeを解決するために、「集約」と「参加」を使用する必要があり、この:

data.agg(max($"district").as("maxd")).as("d1").join(data.as("d2"), $"d1.maxd" === $"d2.district").select($"address",$"district").show() 

データをソートする機能あなたが

0

あなたが使用できるため、あなたのDataFrame.Itが参考になりますですデータフレーム&を降順に並べます。それからちょうど必要な出力を得るヘッド機能を使用してください。

ここはコードサンプルです。

import org.apache.spark.sql.functions._ 
val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("/user/userapp/sample.csv"); 
val a = df.sort(desc("district")).head 

ここは出力です。

enter image description here

関連する問題