2016-04-12 5 views
2

まず、データセットAPIを使用して静的データを操作してから、DataStream APIを使用してストリーミングジョブを実行します。私がIDEにコードを書くと、それは完全に動作します。しかし、ローカルのFlinkジョブマネージャ(すべての並列処理1)を実行しようとすると、ストリーミングコードは決して実行されません!Flink:データセットとDatastream APIを1つのプログラムにまとめました。出来ますか?

たとえば、次のコードは動作していない:私はこのことが作業を取得しようとする必要があり

val parallelism = 1 

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(parallelism) 

val envStatic = ExecutionEnvironment.getExecutionEnvironment 
envStatic.setParallelism(parallelism) 

val myStaticData = envStatic.fromCollection(1 to 10) 
val myVal: Int = myStaticData.reduce(_ + _).collect().head 

val theStream = env.fromElements(1).iterate(iteretion => { 
    val result = iteretion.map(x => x + myVal) 
    (result, result) 
}) 
theStream.print() 
env.execute("static and streaming together") 

何?

ログ:execution logs for above program

実行計画:plan 巡回しているようです。

+0

ログは何を表していますか? –

+0

@TillRohrmannリンクが追加されました。 –

+0

クライアントログには何が表示されますか? –

答えて

3

Flinkジョブが複数のサブジョブで構成されている場合(例: countcollectまたはprintによってトリガーされた場合、Webインターフェイス経由でジョブを送信できません。 Webインターフェイスは単一のFlinkジョブのみをサポートします。

関連する問題