2015-11-26 9 views
8

完了時に複数の配列を戻すApache Spark SQLを使用して、Javaでユーザー定義集約関数(UDAF)を作成しようとしています。私はオンラインで検索しており、これを行う方法の例や提案は見つけられません。Apache Spark SQLのユーザー定義集計関数(UDAF)から複数の配列を戻す

1つの配列を返すことはできますが、複数の配列を返すevaluate()メソッドで正しい形式でデータを取得する方法を理解することはできません。

UDAFはevaluate()メソッドで配列を出力することができますが、これらの配列を呼び出しコードに戻す方法はわかりません(参考のために以下に示されています)。

UserDefinedAggregateFunction customUDAF = new CustomUDAF(); 
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data"); 

私は、全体のカスタムUDAFクラス以下に含まれているが、主要なメソッドは、データ型()と最初に示されている方法を()、評価します。

何か助けや助言をいただければ幸いです。ありがとうございました。

public class CustomUDAF extends UserDefinedAggregateFunction { 

    @Override 
    public DataType dataType() { 
     // TODO: Is this the correct way to return 2 arrays? 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public Object evaluate(Row buffer) { 
     // Data conversion 
     List<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     List<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 

     // Processing of data (omitted) 

     // TODO: How to get data into format needed to return 2 arrays? 
     return dataList; 
    } 

    @Override 
    public StructType inputSchema() { 
     return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType); 
    } 

    @Override 
    public StructType bufferSchema() { 
     return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false)) 
      .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false)); 
    } 

    @Override 
    public void initialize(MutableAggregationBuffer buffer) { 
     buffer.update(0, new ArrayList<Long>()); 
     buffer.update(1, new ArrayList<Double>()); 
    } 

    @Override 
    public void update(MutableAggregationBuffer buffer, Row row) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0)); 
     longList.add(row.getLong(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1)); 
     dataList.add(row.getDouble(1)); 

     buffer.update(0, longList); 
     buffer.update(1, dataList); 
    } 

    @Override 
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 
     ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0)); 
     longList.addAll(buffer2.getList(0)); 

     ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1)); 
     dataList.addAll(buffer2.getList(1)); 

     buffer1.update(0, longList); 
     buffer1.update(1, dataList); 
    } 

    @Override 
    public boolean deterministic() { 
     return true; 
    } 
} 

更新:zero323によって回答に基づいて私が使用して二つの配列を返すことができた:

return new Tuple2<>(longArray, dataArray); 

このうちデータを取得するには、闘争の少しだったが、データフレームを解体関与Javaリストに追加し、それをDataFrameに戻すことができます。

答えて

5

私がタプルを返すと言うことができる限り、十分であるはずです。スラングでは:

import org.apache.spark.sql.expressions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.{Row, Column} 

object DummyUDAF extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType() 
    .add("buff", ArrayType(LongType)) 
    .add("buff2", ArrayType(DoubleType)) 
    def dataType = new StructType() 
    .add("xs", ArrayType(LongType)) 
    .add("ys", ArrayType(DoubleType)) 
    def deterministic = true 
    def initialize(buffer: MutableAggregationBuffer) = {} 
    def update(buffer: MutableAggregationBuffer, input: Row) = {} 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {} 
    def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0)) 
} 

val df = sc.parallelize(Seq(("a", 1), ("b", 2))).toDF("k", "v") 
df.select(DummyUDAF($"k")).show(1, false) 

// +---------------------------------------------------+ 
// |(DummyUDAF$(k),mode=Complete,isDistinct=false)  | 
// +---------------------------------------------------+ 
// |[WrappedArray(1, 2, 3),WrappedArray(1.0, 2.0, 3.0)]| 
// +---------------------------------------------------+ 
+0

私は(低い、平均、高)のタプルをUDAFからの信頼性の補間として返しています。このタプルを複数の列に分解する方法はありますか? '| key | [1.0,1.5,2.0] |' 'key | 1.0 | 1.5 | 2.0 |' – TomTom101

+0

@ TomTom101タプル(構造体フィールド)単純な選択で十分です。 – zero323

+0

驚くことに、そのトリックはあります!私は今、より読みやすくするためにケースクラスを返しました(投稿する前に試したはずです)。どうも! – TomTom101

関連する問題