2017-04-24 3 views
0

1つのDatastreamで2つのカフカトピック間の結合を行いたい。1つのデータストリームに2つのカフカトピックのイベントとルールを一致または結合する

実際、2つのデータストリームの結合には同じIDが必要です。 イベントはセンサからのデータで、ルールにはCEPでチェックされるルールが含まれています(ユーザーインターフェイスからのもの)。

これは私のテストですが動作しません。

DataStream<Object> evtAndRule=inputEventStream.join(rulesStream) 
      .where(new KeySelector<TrackEvent, Object>() { 
       @Override 
       public Object getKey(Event event) throws Exception { 
        return event.getId(); 
       } 
      }).equalTo(new KeySelector<RulesEvent, Object>() { 
       @Override 
       public Object getKey(RulesEvent rulesEvent) throws Exception { 
        return rulesEvent.getId(); 
       } 
      }).window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS))) 
      .apply(new FlatJoinFunction<TrackEvent, RulesEvent, Object>() { 
       @Override 
       public void join(TrackEvent trackEvent, RulesEvent rulesEvent, Collector<Object> collector) throws Exception { 
      .... 

       } 
      }); 

答えて

0

私はこれを試してみましたが、私は目的のルールを取得する方法がわからないと、これが最善の解決策

 DataStream<Tuple2<Event , RulesEvent>> evtAndRule= inputEventStream.map(new MapFunction<Event , Tuple2<Event , RulesEvent>>() { 
     @Override 
     public Tuple2<Event , RulesEvent> map(final Event event) throws Exception { 

      return new Tuple2<Event , RulesEvent>(event, new RulesEvent()); 
     } 
    }); 
ある場合
関連する問題