完了時に複数の配列を戻すApache Spark SQLを使用して、Javaでユーザー定義集約関数(UDAF)を作成しようとしています。私はオンラインで検索しており、これを行う方法の例や提案は見つけられません。Apache Spark SQLのユーザー定義集計関数(UDAF)から複数の配列を戻す
1つの配列を返すことはできますが、複数の配列を返すevaluate()メソッドで正しい形式でデータを取得する方法を理解することはできません。
UDAFはevaluate()メソッドで配列を出力することができますが、これらの配列を呼び出しコードに戻す方法はわかりません(参考のために以下に示されています)。
UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");
私は、全体のカスタムUDAFクラス以下に含まれているが、主要なメソッドは、データ型()と最初に示されている方法を()、評価します。
何か助けや助言をいただければ幸いです。ありがとうございました。
public class CustomUDAF extends UserDefinedAggregateFunction {
@Override
public DataType dataType() {
// TODO: Is this the correct way to return 2 arrays?
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public Object evaluate(Row buffer) {
// Data conversion
List<Long> longList = new ArrayList<Long>(buffer.getList(0));
List<Double> dataList = new ArrayList<Double>(buffer.getList(1));
// Processing of data (omitted)
// TODO: How to get data into format needed to return 2 arrays?
return dataList;
}
@Override
public StructType inputSchema() {
return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType);
}
@Override
public StructType bufferSchema() {
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new ArrayList<Long>());
buffer.update(1, new ArrayList<Double>());
}
@Override
public void update(MutableAggregationBuffer buffer, Row row) {
ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0));
longList.add(row.getLong(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1));
dataList.add(row.getDouble(1));
buffer.update(0, longList);
buffer.update(1, dataList);
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0));
longList.addAll(buffer2.getList(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1));
dataList.addAll(buffer2.getList(1));
buffer1.update(0, longList);
buffer1.update(1, dataList);
}
@Override
public boolean deterministic() {
return true;
}
}
更新:zero323によって回答に基づいて私が使用して二つの配列を返すことができた:
return new Tuple2<>(longArray, dataArray);
このうちデータを取得するには、闘争の少しだったが、データフレームを解体関与Javaリストに追加し、それをDataFrameに戻すことができます。
私は(低い、平均、高)のタプルをUDAFからの信頼性の補間として返しています。このタプルを複数の列に分解する方法はありますか? '| key | [1.0,1.5,2.0] |' 'key | 1.0 | 1.5 | 2.0 |' – TomTom101
@ TomTom101タプル(構造体フィールド)単純な選択で十分です。 – zero323
驚くことに、そのトリックはあります!私は今、より読みやすくするためにケースクラスを返しました(投稿する前に試したはずです)。どうも! – TomTom101