2016-09-09 2 views
2

私はApache Flinkを使用してプロトタイプアプリケーションを作成しています。このプロセスでは、特定のユースケースに対してorg.apache.flink.streaming.api.functions.windowing.WindowFunctionを使用することを選択しました。しかし、apply()関数の本体を書いている間、私はこのエラーに直面しています(以下のコードは、私が書いているアプリケーションのものではありません - 私のデータ型は異なります)。Scala WindowFunctionがコンパイルされない

import scala.collection.Iterable 
import scala.collection.Map 
import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow} 
import org.apache.flink.util.Collector 
import scala.collection.JavaConversions._ 

class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { 
    var count = 0L 
    for (in <- input) { 
     count = count + 1 
    } 
    out.collect(s"Window $window count: $count") 
    } 
} 

コンパイラは不平を言っている:

Error:(16, 7) class MyWindowFunction needs to be abstract, since method apply in trait WindowFunction of type 
(x$1: String, x$2: org.apache.flink.streaming.api.windowing.windows.TimeWindow, 
x$3: Iterable[(String, Long)], 
x$4: org.apache.flink.util.Collector[String])Unit is not defined 
    class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

私はにパラメータの順序を確認している(適用)。彼らは正しいと思われる。

何らかの理由で、エラーの正確な原因を特定できません。誰かが私に解決策を教えてもらえますか?

答えて

4

このエラーの原因が見つかりました。

class MyWindowFunction extends 
     WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], // from java.lang.Iterable 
     collector: Collector[String]): Unit = { 

     // .... 
    } 
} 

だから、私は適切にインポートする必要がありました:

import java.lang.Iterable // From Java 
import java.util.Map  // From Java 

import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 

import scala.collection.JavaConversions._ // Implicit conversions 

class MyWindowFunction 
    extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], 
     collector: Collector[String]): Unit = { 

    // .... 

    } 
} 
アパッチFLINKのAPIは、代わりにそのScalaの同等の、java.lang.Iterableを期待しているという事実は、私にははっきりしていたではなかった何

すべてが順調でした。

関連する問題