私は2つのフォークを持つストリームを持っているので、2つのSplitStreamがあります。ここでFlink出力セレクタに異常な動作があります
はコードです:
static final class MyOutputSelector1 implements OutputSelector<Long> {
@Override
public Iterable<String> select(Long value) {
List<String> outputs = new ArrayList<>();
if (value < 5) {
outputs.add("valid1");
}
else {
outputs.add("error1");
}
return outputs;
}
}
static final class MyOutputSelector2 implements OutputSelector<Long> {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> select(Long value) {
List<String> outputs = new ArrayList<String>();
if (value == 2) {
outputs.add("valid2");
}
else {
outputs.add("error2");
}
return outputs;
}
}
@Test
public void outputSelectorTest() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SplitStream<Long> split1 = env.generateSequence(1, 11).split(new MyOutputSelector1());
DataStream<Long> stream11 = split1.select("valid1");
stream11.print();
SplitStream<Long> split2 = stream11.split(new MyOutputSelector2());
DataStream<Long> stream21 = split2.select("valid2");
stream21.print();
DataStream<Long> stream22 = split2.select("error2");
stream22.printToErr();
env.execute();
}
そして、ここでは、私はこのコードを実行すると、私が手入力されている:
私のソースは1と11の間の整数のリストです。 私はstream11が5未満の整数しか含まないと思っています。それは私がそれを印刷すると大丈夫と思われます。 私はstream21が2を含んでいると思います.2つの "2"が印刷されているようです。 しかし、私はstream22が2以外の5未満のすべての整数を含むと予想しますが、1と11の間のすべての整数が出力されます。
なぜこのように動作しますか?私は最初のセレクタがストリーム内で1から4までの整数だけを保持していると思ったが、最後の分割後に5から11の整数が再び現れる。
要約すると、ここに私が得ているものと期待するものがある:
はおそらく、私は理解していないメカニズムがあります。解決策はありますか?代わりにフィルタを使用する必要がありますか?
ありがとうございました。