2016-07-28 6 views
0

次のようにApache Flinkを使用したいと思います。私は別の流れのデータによって豊かにされなければならない1つのメインストリームを持っています。このメインストリームには、属性「site」と「timestamp」を持つ要素があります。他のストリーム(それをcountrystreamと呼ぶ)には、属性「サイト」と「国」があります。 countrystreamは、サイトに使用されている最新の国を追跡する必要があります。たとえば、("klm.com", "netherlands")が最初に到着し、しばらくしてタプル("klm.com", "france")が到着した場合、「klm.com」は「france」(これは後者であるため)を指す必要があります。したがって、状態を維持する必要があります。タプル( "klm.com"、100)がメインストリームに到着したとします。これで、("klm.com", 100, "france")に充実しているはずです。いくつかのサイトがcountrystreamに見つからない場合は、 "?"で強調してください。たとえば、("stackoverflow.com", 150, "?")です。どのように私はこれを達成することができますか?ストリームを別のストリームで強化する

答えて

0

解決策が見つかりました(しばらく時間がかかりました)。これは効率的ですか?それは改善することができますか?私の反復ストリームにチェックポイントを置くことができないということですか?

val env = StreamExecutionEnvironment.getExecutionEnvironment 

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") 
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A")) 
     .iterate(
      iteration => { 
       (iteration, iteration) 
      } 
     ) 

mainStream 
    .coGroup(infoStream) 
     .where[String]((x: String) => x) 
     .equalTo(_._2) 
     .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) { 
      (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => { 
       first.foreach((key: String) => { 
         val matchingRecords = second 
          .filter(_._2 == key) 
         if (matchingRecords.nonEmpty) { 
          val matchingRecord = matchingRecords.maxBy(_._1) 
          out.collect((matchingRecord._2, matchingRecord._3)) 
         } 
        } 
       ) 
      } 
     } 
    .print() 

env.execute("proof_of_concept") 
関連する問題