2016-10-09 20 views
2

scala.collection.immutable.List $ SerializationProxyのインスタンスをorg.apache.spark.rdd.RDD.orgフィールドに割り当てることができません$ apache $ rdd $ RDD $$ dependency_ of type scala org.apache.spark.rdd.MapPartitionsRDDSpark Cluster Driverがエラーで失敗する -

JavaPairInputDStreamメッセージ= KafkaUtils.createDirectStream( JSSC、 String.class、 バイト[]。クラス、 StringDecoder.class、 DefaultDecoderのインスタンスで.collection.Seq。クラス、 kafkaParams、 トピックセット );

 JavaDStream<CustomerActivityRequestModel> customerActivityStream = messages.map(new Function<Tuple2<String, byte[]>, CustomerActivityRequestModel>() { 
      /** 
     * 
     */ 
     private static final long serialVersionUID = -75093981513752762L; 

      @Override 
      public CustomerActivityRequestModel call(Tuple2<String, byte[]> tuple2) throws IOException, ClassNotFoundException { 

       CustomerActivityRequestModel x = NearbuySessionWorkerHelper.unmarshal(CustomerActivityRequestModel.class, tuple2._2); 
       LOGGER.info(x.getActionLink()); 
       LOGGER.info(x.getAppVersion()); 
       return x; 
      } 
     }); 




    customerActivityStream.foreachRDD(new VoidFunction<JavaRDD<CustomerActivityRequestModel>>() { 



     /** 
     * 
     */ 
     private static final long serialVersionUID = -9045343297759771559L; 

     @Override 
     public void call(JavaRDD<CustomerActivityRequestModel> customerRDD) throws Exception { 
      Configuration hconf = HBaseConfiguration.create(); 
      hconf.set("hbase.zookeeper.quorum", "localhost"); 
      hconf.set("hbase.zookeeper.property.clientPort", "2181"); 
      //hconf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); 
      hconf.set(TableInputFormat.INPUT_TABLE, hbaseTableName); 
      Job newAPIJobConfiguration1 = Job.getInstance(hconf); 
      newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); 
      newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); 

      JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts= customerRDD.mapToPair(new PairFunction<CustomerActivityRequestModel, ImmutableBytesWritable, Put>() { 


       /** 
       * 
       */ 
       private static final long serialVersionUID = -6574479136167252295L; 

       @Override 
       public Tuple2<ImmutableBytesWritable, Put> call(CustomerActivityRequestModel customer) throws Exception { 


          Bytes.toBytes("long"),Bytes.toBytes(customer.getLongitude())); 
        return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
       } 
      }); 
      hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); 

     } 
    }); 

答えて

0

あなたが実行しているjarファイルは、各ノードのクラスパスにある必要があり、私の場合は同じ問題を解決しました。

関連する問題