1

私はSparkを使ってCassandraからデータを読み込もうとしています。CassandraでSparkジョブアボートの根本原因を見つける方法(ClassCastException - ShuffleMapTask to Task)?

DataFrame rdf = sqlContext.read().option("keyspace", "readypulse") 
      .option("table", "ig_posts") 
      .format("org.apache.spark.sql.cassandra").load(); 

    rdf.registerTempTable("cassandra_table"); 
    System.out.println(sqlContext.sql("select count(external_id) from cassandra_table").collect()[0].getLong(0)); 

タスクは次のエラーで失敗します。 ShuffleMaptaskがなぜ呼び出されているのか、それをTaskにキャストするのがなぜ問題なのか理解できません。

16/03/30 02:27:15 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, ip-10-165-180-22.ec2.internal): 
      java.lang.ClassCastException: 
      org.apache.spark.scheduler.ShuffleMapTask 
       cannot be cast to org.apache.spark.scheduler.Task 
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) 
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
         at java.lang.Thread.run(Thread.java:745) 
16/03/30 02:27:15 INFO TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) on executor ip-10-165-180-22.ec2.internal: 
      java.lang.ClassCastException (org.apache.spark.scheduler.Shuf 
       fleMapTask 
      cannot be cast to org.apache.spark.scheduler.Task) [duplicate 1] 

16/03/30 02:27:15 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 

私はカサンドラ2.2(Datastaxコミュニティ)、1.6スパーク、EMR 4.4を使用して、スパーク-Cassandraのコネクタ-java_2.10 1.6.0-M1(これも1.5.0を試して)い

I次のコードでも同じことを試みましたが、同じエラーが発生しました。

CassandraJavaRDD<CassandraRow> cjrdd = functions.cassandraTable(
      KEYSPACE, tableName).select(columns); 
    logger.info("Got rows from cassandra " + cjrdd.count()); 

    JavaRDD<Double> jrdd2 = cjrdd.map(new Function<CassandraRow, Double>() { 
     @Override 
     public Double call(CassandraRow trainingRow) throws Exception { 
      Object fCount = trainingRow.getRaw("follower_count"); 
      double count = 0; 
      if (fCount != null) { 
       count = (Long) fCount; 
      } 
      return count; 
     } 
    }); 
    logger.info("Mapper done : " + jrdd2.count()); 
    logger.info("Mapper done values : " + jrdd2.collect()); 
+0

ソートしましたか?問題の本当の原因は何でしたか? –

答えて

1

私が原因 --conf spark.executor.userClassPathFirst=trueに最近、同様の問題に遭遇してきました。 Spark's official documentationを引用

spark.driver.userClassPathFirstとして(実験)同じ機能をspark.executor.userClassPathFirstが、インスタンスをエグゼキュータに適用されます。 「しかし、これらは実行時に追加され、ユーザーのjarファイルは、Hadoopのを含めるべきではありませんやライブラリをスパーク。」

は、私はこれらの例外は、いくつかのjarバージョンの競合が原因だったと思う、とスパーク文書によって、

関連する問題