2016-10-21 9 views
1

2人のワーカーインスタンスでsparkアプリケーションを実行中にエラー "Failed to get broadcast_5_piece0 of broadcast5"を取得しています。 私はspark.cleaner.ttlも設定しました。その後も同じエラーが発生しています。 誰でもお手伝いできますか?ブロードキャストのbroadcast_5_piece0を取得できませんでした。

フルスタックトレースはここで見つけることができます。コードを追加

java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_5_piece0 of broadcast_5 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1212) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_5_piece0 of broadcast_5 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) 
    ... 11 more 

............

public class Insert_into_cassandra implements Serializable { 
    static List<String> signal_name_pass=new ArrayList<String>(); 
    static int count=4; 
    static SparkConf conf=new SparkConf().setAppName("Insert_into_cassandra").setMaster("local").set("spark.cassandra.connection.host", "127.0.0.1"); 
    static JavaSparkContext jspc=new JavaSparkContext(conf); 
    static SparkContextJavaFunctions functions=CassandraJavaUtil.javaFunctions(jspc); 
    static Insert_into_cassandra iic=new Insert_into_cassandra(); 
    static int value_in_db=0; 
public static void main(String gg[]) 
{ 


JavaRDD<String> rbmfile=jspc.textFile("/home/amd/Desktop/prac"); 


JavaPairRDD<String, Parse_Object> signal_name=rbmfile.mapToPair(new PairFunction<String, String, Parse_Object>() { 


    public Tuple2<String, Parse_Object> call(String x) throws Exception { 

     return new Tuple2<String, Parse_Object>(x.split(" ")[0], new Parse_Object(x.split(" ")[1],x.split(" ")[2],x.split(" ")[3])); 
    } 
}); 



JavaRDD<CassandraRow> signal_name_cassandra=functions.cassandraTable("tutorialspoint", "stuff_id_2"); 



JavaRDD<CassandraRow> sort=signal_name_cassandra.sortBy(new Function<CassandraRow, String >() { 


    public String call(CassandraRow x) throws Exception { 

     return x.getString(0); 
    } 
}, false, 1).coalesce(1); 


value_in_db=Integer.parseInt(sort.first().getString(0)); 


JavaPairRDD<String, String> take_signal_name=signal_name_cassandra.mapToPair(new PairFunction<CassandraRow, String, String>() { 

    public Tuple2<String, String> call(CassandraRow x) throws Exception { 
     // TODO Auto-generated method stub 

     return new Tuple2<String, String>(x.getString(1),x.getString(0)); 
    } 
}); 



/*JavaPairRDD<String, String> getting_max_id=signal_name_cassandra.mapToPair(new PairFunction<CassandraRow, String, String>() { 

    @Override 
    public Tuple2<String, String> call(CassandraRow x) throws Exception { 
     // TODO Auto-generated method stub 

     return new Tuple2<String, String>(x.getString(0),x.getString(1)); 
    } 
});*/ 


JavaPairRDD<String, Tuple2<Optional<String>, Parse_Object>> join=take_signal_name.rightOuterJoin(signal_name); 



JavaPairRDD<String, String> getting_id=join.mapToPair(new PairFunction<Tuple2<String,Tuple2<Optional<String>,Parse_Object>>, String, String>() { 

    public Tuple2<String, String> call(
      Tuple2<String, Tuple2<Optional<String>, Parse_Object>> x) 
      throws Exception { 

     if(x._2()._1().isPresent()) 
     { 
      System.out.println("if----"+x._1()); 
      return new Tuple2<String, String>(x._1().toString(), x._2()._1().toString()); 

     } 

     else 
     { 
      signal_name_pass.add(x._1()); 

     } 
    return null; 

    } 


}).filter(new Function<Tuple2<String,String>, Boolean>() { 


    public Boolean call(Tuple2<String, String> x) throws Exception { 
     // TODO Auto-generated method stub 
     return x!=null; 
    } 
}); 
getting_id.saveAsTextFile("/home/amd/Desktop/smal/get13"); 

答えて

1

がスパーククリーナーTTLはほとんど常に間違ったことです設定スパークの現代版でやるべきこと。これにより、重要なキャッシュデータが早期に削除される可能性があります。 Context Cleanerをビルドすることは、その作業をより安全に行うことができます。執行上の

  1. どれでも不審なログ:

    物事を解決しないTTLをオフにする場合は、次のように多くの関連する詳細情報を提供する必要がありますか?

  2. 例外の完全なスタックトレース?
  3. 実行しているコードの例?
+0

2)完全なスタックトレースを見つけてください。 – Aman

+0

私はブロードキャスト変数を使用していないと思いますので、ブロードキャスト変数スパークは何を使用していますか? – Aman

+0

1)エグゼキュータの不意のログはありません。 3)私はそれをろ過した後、カサンドラに2列を挿入しています。あなたが何かもっと欲しいかどうか私に教えてください。 – Aman

関連する問題