2016-08-30 5 views
1

データストリームの要素を格納するためにサイズ2のFIFOキューを使用します。いずれにしても、ストリームに入っている前の要素が必要ですが、現在の要素は必要ありません。これを行うには、ストリームコードの外にキューを作成し、現在の要素をキューに入れています。私のキューに2つの要素がある場合は、その要素をデキューして最初の要素を使用します。Flinkのグローバル変数

私が直面している問題は、ストリームコードの外に宣言されているため、キューをエンキューできないことです。ストリーミングは複数のJVMを使用し、キューは1つのJVMで宣言されるからです。

以下

はサンプルコードです:ここで

val queue = Queue[Array[Double]]() //Global Queue 

val ws = dataStream.map(row => { 
    queue.enqueue(row) 
    println(queue.size) //Prints 0 always 
    if(queue.size == 2){ 
     result = operate(queue(0)) 
     queue.dequeue 
    } 
    result 
}) 

、何もキューに入れない取得され、キューのサイズは常に0 である我々は、すべてに分散されているFLINKでグローバル変数を作成することができる方法はありますJVMは?そうでない場合、このロジックを実装する他の方法はありますか?

答えて

0

驚いたことに、QueueをScala Listに置き換えたときに十分でした。