2017-07-31 1 views
0

私はApache Flink(1.3.1)で作業しているときに初めて質問をします。より詳しくは、私はflink-core、flink-cep、flink-streamingライブラリを扱っています。私のアプリケーションはAkka ActorSystemで、RabbitMQからのメッセージを消費し、さまざまなアクターがこのメッセージを処理します。いくつかのアクターでは、FlinkからStreamExecutionEnvironmentをインスタンス化して、着信メッセージを処理したいと思っています。したがって、RichSourceFunctionクラスを拡張するカスタムソースクラスを作成しました。 1つのことを除いてすべてが動作します:私はFlink拡張機能にデータを送信する方法がわかりません。ここに私のセットアップは次のとおりです。カスタムソースからデータを連続的に書き出します。

public class FlinkExtension { 

    private static StreamExecutionEnvironment environment; 
    private DataStream<ValueEvent> input; 
    private CustomSourceFunction function; 

    public FlinkExtension(){ 

     environment = StreamExecutionEnvironment.getExecutionEnvironment(); 

     function = new CustomSourceFunction(); 
     input = environment.addSource(function); 

     PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern()); 

     DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() { 
      @Override 
      public String select(Map<String, List<ValueEvent>> pattern) throws Exception { 
       return null; //TODO 
      } 
     }); 

     warnings.print(); 

     try { 
      environment.execute(); 
     } catch(Exception e){ 
      e.printStackTrace(); 
     } 

    } 

    private Pattern<ValueEvent, ?> _pattern(){ 

     return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() { 
      @Override 
      public boolean filter(ValueEvent value) throws Exception { 
       return value.getValue() > 10; 
      } 
     }); 
    } 

    public void sendData(ValueEvent value){ 
     function.sendData(value); 
    } 
} 

そして、これは私のカスタムソース関数である:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> { 

    private volatile boolean run = false; 
    private SourceContext<ValueEvent> context; 

    @Override 
    public void open(Configuration parameters){ 
     run = true; 
    } 

    @Override 
    public void run(SourceContext<ValueEvent> ctx) throws Exception { 
     this.context = ctx; 

     while (run){ 

     } 
    } 

    public void sendData(ValueEvent value){ 
     this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis()); 
    } 

    @Override 
    public void cancel() { 
     run = false; 
    } 
} 

だから私は、私への連続的な方法でデータを書き込むために外部からの私のFlinkExtensionクラスのメソッドsendDataを呼び出したいですFlinkExtension。私のJUnitテストでは、データをエクステンションに送信してから、SourceContextにデータを書き込む必要があります。

@Test 
public void testSendData(){ 
    FlinkExtension extension = new FlinkExtension(); 
    extension.sendData(new ValueEvent(30)); 
} 

しかし、私は、テストを実行した場合、何も起こりません、アプリケーションがCustomSourceFunctionのrunメソッドでハングアップします。私はまた、CustomSourceFunction runメソッドで新しいエンドレススレッドを作成しようとしました。

要約:誰かが、アプリケーションからのデータをFlinkインスタンスに連続的に書き込む方法を知っていますか?

答えて

1

flinkソースコネクタは、run()メソッドがwhile(run)ループ内でcollect()(またはcollectWithTimestamp())を呼び出すことによって、連続したデータストリームを生成します。例を勉強したいのであれば、Apache NiFiソースはほとんどの場合ほど複雑ではありません。 here's its run method

関連する問題