Cassandra & DataStaxコミュニティ、私は誰かが賢明な人を助けてくれると願っています。Spark SQL UDFタスクがシリアル化されていない
アサーションコードをHadoopからSparkに移行し、Cassandra(DataStax Enterprise経由)上で実行しています。生産では4.7、開発では4.8でした。
Java 7の開発環境では、Java 7/8が開発中です。
必要なDataFrame変換がいくつかあります。メモリ内のDataFrameに対してSpark SQLContext経由で使用されるUDFを記述すると、その作業が実行されると考えられます。これらの主なものは次のとおりです。
- 私たちのデータのすべての単一のテキスト値は接頭辞付きで後に "。つまり、「いくつかのデータ」これは非常に面倒なので、これらのそれぞれをクリーニングしたいと思います。
- 多くの他の列から構成されたハッシュキーを含む列を追加したいとします。
コードは次のとおりです。これはsqlContextにUDF呼び出しを含めることなくうまく実行されますが、追加されるとすぐに「タスクがシリアライズできません」というエラーが表示されます
スレッド「main」の例外org.apache.spark.SparkException:シリアライズ可能
私はこのクラスの基本クラスとして "implements Serializable"を入れてみましたが、これはエラークラスをチェーンの次のものに変更しますが、Exceptionクラスでは失敗するシリアライズ可能ではありません...おそらく私は間違った方向に向かっています。
また、ラムダとしてUDFを実装しようとしましたが、同じエラーが発生します。
誰かが私が間違っていることを指摘できたら、それは非常に感謝しています!
public class entities implements Serializable{
private spark_context m_spx = null;
private DataFrame m_entities = null;
private String m_timekey = null;
public entities(spark_context _spx, String _timekey){
m_spx = _spx;
m_timekey = _timekey;
}
public DataFrame get_dimension(){
if(m_entities == null) {
DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url");
//UDF to generate hashed ids
UDF2 get_hashed_id = new UDF2<String, String, String>() {
public String call(String o, String o2) throws Exception {
return o.concat(o2);
}
};
//UDF to clean the " from strings
UDF1 clean_string = new UDF1<String, String>() {
public String call(String o) throws Exception {
return o.replace("\"","");
}
};
//Get the Spark SQL Context from SC.
SQLContext sqlContext = new SQLContext(m_spx.sc());
//Register the UDFs
sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType);
sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType);
//Register the DF as a table.
sqlContext.registerDataFrameAsTable(df, "entities");
m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities");
}
return m_entities;
}
}
ありがとうTzach、それは治療を働いた。他の人の助けを借りて私はまた追加しなければならなかった conf.setJars(新しいString [] {"/ app-integrationations/sparkworker_jar/sparkwork_java.jar"}); SparkConfには、実行中のボックスにあるjarファイルのパスが含まれています。 – gerrymcdev