2016-11-04 21 views
0

私は2つのフォークを持つストリームを持っているので、2つのSplitStreamがあります。ここでFlink出力セレクタに異常な動作があります

はコードです:

static final class MyOutputSelector1 implements OutputSelector<Long> { 

    @Override 
    public Iterable<String> select(Long value) { 
     List<String> outputs = new ArrayList<>(); 
     if (value < 5) { 
      outputs.add("valid1"); 
     } 
     else { 
      outputs.add("error1"); 
     } 
     return outputs; 
    } 
} 

static final class MyOutputSelector2 implements OutputSelector<Long> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public Iterable<String> select(Long value) { 
     List<String> outputs = new ArrayList<String>(); 
     if (value == 2) { 
      outputs.add("valid2"); 
     } 
     else { 
      outputs.add("error2"); 
     } 
     return outputs; 
    } 
} 

@Test 
public void outputSelectorTest() throws Exception { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setParallelism(1); 


    SplitStream<Long> split1 = env.generateSequence(1, 11).split(new MyOutputSelector1()); 
    DataStream<Long> stream11 = split1.select("valid1"); 
    stream11.print(); 

    SplitStream<Long> split2 = stream11.split(new MyOutputSelector2()); 
    DataStream<Long> stream21 = split2.select("valid2"); 
    stream21.print(); 
    DataStream<Long> stream22 = split2.select("error2"); 
    stream22.printToErr(); 

    env.execute(); 
} 

そして、ここでは、私はこのコードを実行すると、私が手入力されている:

Program output

私のソースは1と11の間の整数のリストです。 私はstream11が5未満の整数しか含まないと思っています。それは私がそれを印刷すると大丈夫と思われます。 私はstream21が2を含んでいると思います.2つの "2"が印刷されているようです。 しかし、私はstream22が2以外の5未満のすべての整数を含むと予想しますが、1と11の間のすべての整数が出力されます。

なぜこのように動作しますか?私は最初のセレクタがストリーム内で1から4までの整数だけを保持していると思ったが、最後の分割後に5から11の整数が再び現れる。

要約すると、ここに私が得ているものと期待するものがある:

Diagram

はおそらく、私は理解していないメカニズムがあります。解決策はありますか?代わりにフィルタを使用する必要がありますか?

ありがとうございました。

答えて

0

あなたがバグを見つけたようです。私はFlink 1.1.3と現在のマスターブランチ(Flink 1.2-SNAPSHOT)で問題を再現できました。

バグを追跡するために、私はJIRAの問題:FLINK-5031を提出しました。

関連する問題