2016-11-03 7 views
0

Apache Flinkを使用しています。私はApache Kafkaソースからデータを読み込んで、DataStreamを変換する必要があります。Apache Flink - エラー:メソッドの適用が引数(WindowFunction)に適用されません

DataStream<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> dataStream = 
        env 
        .addSource(new FlinkKafkaConsumer08<>(
            parameterTool.getRequired("topic"), 
            new SimpleStringSchema(), 
            parameterTool.getProperties())) 
        .flatMap(new SplitIntoRecordsString()) 
        .flatMap(new SplitIntoTuples()) 
        .keyBy(1) 
        .countWindow(5) 
        .apply(new windowApplyFunction()); 

    public class windowApplyFunction implements WindowFunction< 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  String, 
                  Double, 
                  Window>{ 

    public void apply(Double key, Window window, 
      Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values, 
      Collector<String> out) 
      throws Exception {  
     out.collect("MyResult"); 
    } 
} 

残念ながら、私は次のエラーを持って、それを修正する方法がわからない:I場合

The method apply(WindowFunction<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,R,Tuple,GlobalWindow>) in the type WindowedStream<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,Tuple,GlobalWindow> is not applicable for the arguments (FlinkManager.windowApplyFunction) 

すべてが正常に動作します私はWindowFunctionを適用しようとする最後のステップで

apply(new windowApplyFunction())をあらかじめ定義された関数に置き換えます。 sum(1)

答えて

0

あなたWindowFunction

WindowFunction< 
    Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
    String, 
    Double, 
    GlobalWindow> 

countWindow()戻りGlobalWindow型の型でなければなりません。

お試しください。

0

ヒントvanekjarのおかげで!この不具合を修正した後、私は別の小さなものを変更し、今すぐ動作します! 正しいコード:

public static class windowApplyFunction implements WindowFunction< 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                  Tuple, 
                  GlobalWindow>{ 

    public void apply(Tuple key, GlobalWindow window, 
      Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values, 
      Collector<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> out) 
      throws Exception {  
     out.collect(new Tuple8<Double, Double, String, Double, Double, Double, Double, Double>()); 
    } 
} 
関連する問題