2017-11-05 4 views
2

Flink:Monitoring the Wikipedia Edit Streamのクイックスタートの例に従っています。Flink:廃止予定の折り畳みを集計する方法

例はJavaであり、そして私は次のように、Scalaでそれを実装しています。しかし

/** 
* Wikipedia Edit Monitoring 
*/ 
object WikipediaEditMonitoring { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) 

    val result = edits.keyBy(_.getUser) 
     .timeWindow(Time.seconds(5)) 
     .fold(("", 0L)) { 
     (acc: (String, Long), event: WikipediaEditEvent) => { 
      (event.getUser, acc._2 + event.getByteDiff) 
     } 
     } 

    result.print 

    // execute program 
    env.execute("Wikipedia Edit Monitoring") 
    } 
} 

、FLINKでfold機能がを非推奨すでにあり、かつaggregate機能が推奨されます。

enter image description here

しかし、私はaggregrateに廃止予定foldを変換する方法についての例やチュートリアルを見つけることができませんでした。

これを行うにはどのように任意のアイデア?おそらくaggregrateを適用するだけではありません。

UPDATE

私は、次のように別の実装があります。

/** 
* Wikipedia Edit Monitoring 
*/ 
object WikipediaEditMonitoring { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) 

    val result = edits 
     .map(e => UserWithEdits(e.getUser, e.getByteDiff)) 
     .keyBy("user") 
     .timeWindow(Time.seconds(5)) 
     .sum("edits") 

    result.print 

    // execute program 
    env.execute("Wikipedia Edit Monitoring") 
    } 

    /** Data type for words with count */ 
    case class UserWithEdits(user: String, edits: Long) 
} 

を私はまた、自己定義AggregateFunctionを使用して実装を持ってする方法を知っていただきたいと思います。

UPDATE

私は、このマニュアルに従っ:AggregateFunctionが、次の質問があります。

:リリース1.3の​​ためのインタフェース AggregateFunctionのソースコードでは

を、あなたはaddが実際voidを返す表示されます

void add(IN value, ACC accumulator); 

バージョン1.4の場合、AggregateFunctionは返されます:

ACC add(IN value, ACC accumulator); 

これはどのように処理すればよいですか?

私が使用しているFlinkのバージョンは1.3.2であり、このバージョンのドキュメントにはAggregateFunctionが含まれていませんが、アーティファクトのリリース1.4はまだありません。

enter image description here

答えて

3

あなたは例を含む、AggregateFunctionin the Flink 1.4 docsのためのいくつかのドキュメントを検索します。

1.3.2に含まれるバージョンは、追加操作によってアキュムレータが変更される、変更可能なアキュムレータタイプで使用することに限定されています。これはfixed for Flink 1.4でしたが、リリースされていません。

+0

新しい更新をご覧ください – fluency03

1
import org.apache.flink.api.common.functions.AggregateFunction 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.api.common.serialization.SimpleStringSchema 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08 
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource} 

class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] { 
    override def createAccumulator() = ("", 0) 

    override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2) 

    override def getResult(accumulator: (String, Int)) = accumulator 

    override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2) 
} 

object WikipediaAnalysis extends App { 
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
    val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource()) 

    val result: DataStream[(String, Int)] = edits 
    .keyBy(_.getUser) 
    .timeWindow(Time.seconds(5)) 
    .aggregate(new SumAggregate) 
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff)) 
    result.print() 

    result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema())) 
    see.execute("Wikipedia User Edit Volume") 
} 
関連する問題