2016-03-23 11 views
0

Cassandra & DataStaxコミュニティ、私は誰かが賢明な人を助けてくれると願っています。Spark SQL UDFタスクがシリアル化されていない

アサーションコードをHadoopからSparkに移行し、Cassandra(DataStax Enterprise経由)上で実行しています。生産では4.7、開発では4.8でした。

Java 7の開発環境では、Java 7/8が開発中です。

必要なDataFrame変換がいくつかあります。メモリ内のDataFrameに対してSpark SQLContext経由で使用されるUDFを記述すると、その作業が実行されると考えられます。これらの主なものは次のとおりです。

  1. 私たちのデータのすべての単一のテキスト値は接頭辞付きで後に "。つまり、「いくつかのデータ」これは非常に面倒なので、これらのそれぞれをクリーニングしたいと思います。
  2. 多くの他の列から構成されたハッシュキーを含む列を追加したいとします。

コードは次のとおりです。これはsqlContextにUDF呼び出しを含めることなくうまく実行されますが、追加されるとすぐに「タスクがシリアライズできません」というエラーが表示されます

スレッド「main」の例外org.apache.spark.SparkException:シリアライズ可能

私はこのクラスの基本クラスとして "implements Serializable"を入れてみましたが、これはエラークラスをチェーンの次のものに変更しますが、Exceptionクラスでは失敗するシリアライズ可能ではありません...おそらく私は間違った方向に向かっています。

また、ラムダとしてUDFを実装しようとしましたが、同じエラーが発生します。

誰かが私が間違っていることを指摘できたら、それは非常に感謝しています!

public class entities implements Serializable{ 
    private spark_context m_spx = null; 
    private DataFrame m_entities = null; 
    private String m_timekey = null; 

    public entities(spark_context _spx, String _timekey){ 
     m_spx = _spx; 
     m_timekey = _timekey; 
    } 


    public DataFrame get_dimension(){ 
     if(m_entities == null) { 

      DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url"); 

      //UDF to generate hashed ids 
      UDF2 get_hashed_id = new UDF2<String, String, String>() { 
       public String call(String o, String o2) throws Exception { 
        return o.concat(o2); 
       } 
      }; 


      //UDF to clean the " from strings 
      UDF1 clean_string = new UDF1<String, String>() { 
       public String call(String o) throws Exception { 
        return o.replace("\"",""); 
       } 
      }; 


      //Get the Spark SQL Context from SC. 
      SQLContext sqlContext = new SQLContext(m_spx.sc()); 


      //Register the UDFs 
      sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType); 
      sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType); 


      //Register the DF as a table. 
      sqlContext.registerDataFrameAsTable(df, "entities"); 
      m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities"); 
     } 

     return m_entities; 
    } 
} 

答えて

4

あなたentitiesクラスはSparkContextメンバーが含まれている - それは、シリアライズすることはできません(SparkContextsがinterntionally直列化可能ではありません、あなたはそれらをシリアル化することになっていません)。 entities以来

はそれのいずれかが(彼らはそれらを保持しているentitiesインスタンスをシリアル化しようとするだろうから)非静的メソッド /メンバー/匿名内部クラスは、いずれかの直列化可能ではないですが、シリアライズ可能ではありません。そして、あなたがget_dimensionでそれらを使用することができます

private final static UDF2 get_hashed_id = new UDF2<String, String, String>() { 
    public String call(String o, String o2) throws Exception { 
     return o.concat(o2); 
    } 
}; 

private final static UDF1 clean_string = new UDF1<String, String>() { 
    public String call(String o) throws Exception { 
     return o.replace("\"",""); 
    } 
}; 

この場合、最適な回避策は、クラスの静的メンバーに匿名のUDFを抽出しています。

+0

ありがとうTzach、それは治療を働いた。他の人の助けを借りて私はまた追加しなければならなかった conf.setJars(新しいString [] {"/ app-integrationations/sparkworker_jar/sparkwork_java.jar"}); SparkConfには、実行中のボックスにあるjarファイルのパスが含まれています。 – gerrymcdev

関連する問題